4 GenEvent
In this chapter, we will explore GenEvent, another behaviour provided by Elixir and OTP that allows us to spawn an event manager that is able to publish events to many handlers.
There are two events we are going to emit: one for every time a bucket is added to the registry and another when it is removed from it.
4.1 Event managers
Let's start a new iex -S mix
session and explore the GenEvent API a bit:
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}
iex> GenEvent.sync_notify(manager, :hello)
:ok
iex> GenEvent.notify(manager, :world)
:ok
GenEvent.start_link/0
starts a new event manager. That is literally all that is required to start a manager. After the manager is created, we can call GenEvent.notify/2
and GenEvent.sync_notify/2
to send notifications.
However, since there are no event handlers tied to the manager, not much happens on every notification.
Let's create our first handler, still on IEx, that sends all events to a given process:
iex> defmodule Forwarder do
...> use GenEvent
...> def handle_event(event, parent) do
...> send parent, event
...> {:ok, parent}
...> end
...> end
iex> GenEvent.add_handler(manager, Forwarder, self())
:ok
iex> GenEvent.sync_notify(manager, {:hello, :world})
:ok
iex> flush
{:hello, :world}
:ok
We created our handler and added it to the manager by calling GenEvent.add_handler/3
passing:
- The manager we previously started and linked
- The event handler module (named
Forwarder
) we just defined - The event handler state: in this case, the current process pid
After adding the handler, we can see that by calling sync_notify/2
, the Forwarder
handler successfully forwards events to our inbox.
There are a couple things that are important to highlight at this point:
- The event handler runs in the same process as the event manager
sync_notify/2
runs event handlers synchronously to the requestnotify/2
runs event handlers asynchronously
Therefore, sync_notify/2
and notify/2
are similar to call/2
and cast/2
in GenServer and using sync_notify/2
is generally recommended. It works as a backpressure mechanism in the calling process, to reduce the likelihood of messages being sent more quickly than they can be dispatched to handlers.
Be sure to check other functionality provided by GenEvent in its module documentation. For now we have enough knowledge to add an event manager to our application.
4.2 Registry events
In order to emit events, we need to change the registry to work with an event manager. While we could automatically start the event manager when the registry is started, for example in the init/1
callback, it is preferrable to pass the event manager pid/name to start_link
, decoupling the start of the event manager from the registry.
Let's first change our tests to showcase the behaviour we want the registry to exhibit. Open up test/kv/registry_test.exs
and change the existing setup
callback to the one below, then add the new test:
defmodule Forwarder do
use GenEvent
def handle_event(event, parent) do
send parent, event
{:ok, parent}
end
end
setup do
{:ok, manager} = GenEvent.start_link
{:ok, registry} = KV.Registry.start_link(manager)
GenEvent.add_mon_handler(manager, Forwarder, self())
{:ok, registry: registry}
end
test "sends events on create and crash", %{registry: registry} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
assert_receive {:create, "shopping", ^bucket}
Agent.stop(bucket)
assert_receive {:exit, "shopping", ^bucket}
end
In order to test the functionality we want to add, we first define a Forwarder
event handler similar to the one we typed in IEx previously. On setup
, we start the event manager, pass it as an argument to the registry and add our Forwarder
handler to the manager so events can be sent to the test process.
In the test, we create and stop a bucket process and use assert_receive
to assert we will receive both :create
and :exit
messages. assert_receive
has a default timeout of 500ms which should be more than enough for our tests. Also note that assert_receive
expects a pattern, rather than a value, that's why we have used ^bucket
to match on the bucket pid.
Finally, notice we called GenEvent.add_mon_handler/3
instead of GenEvent.add_handler/3
. This function adds a handler, as we know, and also tells the event manager to monitor the current process. If the current process dies, the event handler is automatically removed. This makes sense because, in the Forwarder
case, we should stop forwarding messages if the recipient of those messages (self()
/the test process) is no longer alive.
Let's now change the registry to make the tests pass. Open up lib/kv/registry.ex
and paste the new registry implementation below (comments inlined):
defmodule KV.Registry do
use GenServer
## Client API
@doc """
Starts the registry.
"""
def start_link(event_manager, opts \\ []) do
# 1. start_link now expects the event manager as argument
GenServer.start_link(__MODULE__, event_manager, opts)
end
@doc """
Looks up the bucket pid for `name` stored in `server`.
Returns `{:ok, pid}` in case a bucket exists, `:error` otherwise.
"""
def lookup(server, name) do
GenServer.call(server, {:lookup, name})
end
@doc """
Ensures there is a bucket associated with the given `name` in `server`.
"""
def create(server, name) do
GenServer.cast(server, {:create, name})
end
## Server callbacks
def init(events) do
# 2. The init callback now receives the event manager.
# We have also changed the manager state from a tuple
# to a map, allowing us to add new fields in the future
# without needing to rewrite all callbacks.
names = HashDict.new
refs = HashDict.new
{:ok, %{names: names, refs: refs, events: events}}
end
def handle_call({:lookup, name}, _from, state) do
{:reply, HashDict.fetch(state.names, name), state}
end
def handle_cast({:create, name}, state) do
if HashDict.get(state.names, name) do
{:noreply, state}
else
{:ok, pid} = KV.Bucket.start_link()
ref = Process.monitor(pid)
refs = HashDict.put(state.refs, ref, name)
names = HashDict.put(state.names, name, pid)
# 3. Push a notification to the event manager on create
GenEvent.sync_notify(state.events, {:create, name, pid})
{:noreply, %{state | names: names, refs: refs}}
end
end
def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
{name, refs} = HashDict.pop(state.refs, ref)
names = HashDict.delete(state.names, name)
# 4. Push a notification to the event manager on exit
GenEvent.sync_notify(state.events, {:exit, name, pid})
{:noreply, %{state | names: names, refs: refs}}
end
def handle_info(_msg, state) do
{:noreply, state}
end
end
The changes are straightforward. We now pass the event manager we received as an argument to start_link
on to GenServer
initialization. We also change both cast and info callbacks to call GenEvent.sync_notify/2
. Lastly, we have taken the opportunity to change the server state to a map, making it easier to improve the registry in the future.
Run the test suite, and all tests should be green again.
4.3 Event streams
One last functionality worth exploring from GenEvent
is the ability to consume its events as a stream:
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}
iex> spawn_link fn ->
...> for x <- GenEvent.stream(manager), do: IO.inspect(x)
...> end
:ok
iex> GenEvent.notify(manager, {:hello, :world})
{:hello, :world}
:ok
In the example above, we have created a GenEvent.stream(manager)
that returns a stream (an enumerable) of events that are consumed as they come. Since consuming those events is a blocking action, we spawn a new process that will consume the events and print them to the terminal, and that is exactly the behaviour we see. Every time we call sync_notify/2
or notify/2
, the event is printed to the terminal followed by :ok
(which is just IEx printing the result returned by notify functions).
Often event streams provide enough functionality for consuming events that we don't need to register our own handlers. However, when custom functionality is required, or during testing, defining our own event handler callbacks is the best way to go.
At this point, we have an event manager, a registry and potentially many buckets running at the same time. It is about time to start worrying what would happen if any of those processes crash.