6 ETS
Every time we need to look up a bucket, we need to send a message to the registry. In some applications, this means the registry may become a bottleneck!
In this chapter we will learn about ETS (Erlang Term Storage), and how to use it as a cache mechanism. Later we will expand its usage to persist data from the supervisor to its children, allowing data to persist even on crashes.
Warning! Don't use ETS as a cache prematurely! Log and analyze your application performance and identify which parts are bottlenecks, so you know whether you should cache, and what you should cache. This chapter is merely an example of how ETS can be used, once you've determined the need.
6.1 ETS as a cache
ETS allows us to store any Erlang/Elixir term in an in-memory table. Working with ETS tables is done via erlang's :ets
module:
iex> table = :ets.new(:buckets_registry, [:set, :protected])
8207
iex> :ets.insert(table, {"foo", self})
true
iex> :ets.lookup(table, "foo")
[{"foo", #PID<0.41.0>}]
When creating an ETS table, two arguments are required: the table name and a set of options. From the available options, we passed the table type and its access rules. We have chosen the :set
type, which means that keys cannot be duplicated. We've also set the table's access to :protected
, which means that only the process that created the table can write to it, but all processes can read it from it. Those are actually the default values, so we will skip them from now on.
ETS tables can also be named, allowing us to access them by a given name:
iex> :ets.new(:buckets_registry, [:named_table])
:buckets_registry
iex> :ets.insert(:buckets_registry, {"foo", self})
true
iex> :ets.lookup(:buckets_registry, "foo")
[{"foo", #PID<0.41.0>}]
Let's change the KV.Registry
to use ETS tables. We will use the same technique as we did for the event manager and buckets supervisor, and pass the ETS table name explicitly on start_link
. Remember that, as with server names, any local process that knows an ETS table name will be able to access that table.
Open up lib/kv/registry.ex
, and let's change its implementation. We've added comments to the source code to highlight the changes we've made:
defmodule KV.Registry do
use GenServer
## Client API
@doc """
Starts the registry.
"""
def start_link(table, event_manager, buckets, opts \\ []) do
# 1. We now expect the table as argument and pass it to the server
GenServer.start_link(__MODULE__, {table, event_manager, buckets}, opts)
end
@doc """
Looks up the bucket pid for `name` stored in `table`.
Returns `{:ok, pid}` if a bucket exists, `:error` otherwise.
"""
def lookup(table, name) do
# 2. lookup now expects a table and looks directly into ETS.
# No request is sent to the server.
case :ets.lookup(table, name) do
[{^name, bucket}] -> {:ok, bucket}
[] -> :error
end
end
@doc """
Ensures there is a bucket associated with the given `name` in `server`.
"""
def create(server, name) do
GenServer.cast(server, {:create, name})
end
## Server callbacks
def init({table, events, buckets}) do
# 3. We have replaced the names HashDict by the ETS table
ets = :ets.new(table, [:named_table, read_concurrency: true])
refs = HashDict.new
{:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
end
# 4. The previous handle_call callback for lookup was removed
def handle_cast({:create, name}, state) do
# 5. Read and write to the ETS table instead of the HashDict
case lookup(state.names, name) do
{:ok, _pid} ->
{:noreply, state}
:error ->
{:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
ref = Process.monitor(pid)
refs = HashDict.put(state.refs, ref, name)
:ets.insert(state.names, {name, pid})
GenEvent.sync_notify(state.events, {:create, name, pid})
{:noreply, %{state | refs: refs}}
end
end
def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
# 6. Delete from the ETS table instead of the HashDict
{name, refs} = HashDict.pop(state.refs, ref)
:ets.delete(state.names, name)
GenEvent.sync_notify(state.events, {:exit, name, pid})
{:noreply, %{state | refs: refs}}
end
def handle_info(_msg, state) do
{:noreply, state}
end
end
Notice that before our changes KV.Registry.lookup/2
sent requests to the server, but now it reads directly from the ETS table, which is shared across all processes. That's the main idea behind the cache mechanism we are implementing.
In order for the cache mechanism to work, the created ETS table needs to have access :protected
(the default), so all clients can read from it, while only the KV.Registry
process writes to it. We have also set read_concurrency: true
when starting the table, optimizing the table for the common scenario of concurrent read operations.
The changes we have performed above have definitely broken our tests. For starters, there is a new argument we need to pass to KV.Registry.start_link/3
. Let's start amending our tests in test/kv/registry_test.exs
by rewriting the setup
callback:
setup do
{:ok, sup} = KV.Bucket.Supervisor.start_link
{:ok, manager} = GenEvent.start_link
{:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup)
GenEvent.add_mon_handler(manager, Forwarder, self())
{:ok, registry: registry, ets: :registry_table}
end
Notice we are passing the table name of :registry_table
to KV.Registry.start_link/3
as well as returning ets: :registry_table
as part of the test context.
After changing the callback above, we will still have failures in our test suite. All in the format of:
1) test spawns buckets (KV.RegistryTest)
test/kv/registry_test.exs:38
** (ArgumentError) argument error
stacktrace:
(stdlib) :ets.lookup(#PID<0.99.0>, "shopping")
(kv) lib/kv/registry.ex:22: KV.Registry.lookup/2
test/kv/registry_test.exs:39
This is happening because we are passing the registry pid to KV.Registry.lookup/2
while now it expects the ETS table. We can fix this by changing all occurrences of:
KV.Registry.lookup(registry, ...)
to:
KV.Registry.lookup(ets, ...)
Where ets
will be retrieved in the same way we retrieve the registry:
test "spawns buckets", %{registry: registry, ets: ets} do
Let's change our tests to pass ets
to lookup/2
. Once we finish these changes, some tests will continue to fail. You may even notice tests pass and fail inconsistently between runs. For example, the "spawns buckets" test:
test "spawns buckets", %{registry: registry, ets: ets} do
assert KV.Registry.lookup(ets, "shopping") == :error
KV.Registry.create(registry, "shopping")
assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
KV.Bucket.put(bucket, "milk", 1)
assert KV.Bucket.get(bucket, "milk") == 1
end
may be failing on this line:
assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
However how can this line fail if we just created the bucket in the previous line?
The reason those failures are happening is because, for didactic purposes, we have made two mistakes:
- We are prematurely optimizing (by adding this cache layer)
- We are using
cast/2
(while we should be usingcall/2
)
6.2 Race conditions?
Developing in Elixir does not make your code free of race conditions. However, Elixir's simple abstractions where nothing is shared by default make it easier to spot a race condition's root cause.
What is happening in our test is that there is a delay in between an operation and the time we can observe this change in the ETS table. Here is what we were expecting to happen:
- We invoke
KV.Registry.create(registry, "shopping")
- The registry creates the bucket and updates the cache table
- We access the information from the table with
KV.Registry.lookup(ets, "shopping")
- The command above returns
{:ok, bucket}
However, since KV.Registry.create/2
is a cast operation, the command will return before we actually write to the table! In other words, this is happening:
- We invoke
KV.Registry.create(registry, "shopping")
- We access the information from the table with
KV.Registry.lookup(ets, "shopping")
- The command above returns
:error
- The registry creates the bucket and updates the cache table
To fix the failure we just need to make KV.Registry.create/2
synchronous by using call/2
rather than cast/2
. This will guarantee that the client will only continue after changes have been made to the table. Let's change the function and its callback as follows:
def create(server, name) do
GenServer.call(server, {:create, name})
end
def handle_call({:create, name}, _from, state) do
case lookup(state.names, name) do
{:ok, pid} ->
{:reply, pid, state} # Reply with pid
:error ->
{:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
ref = Process.monitor(pid)
refs = HashDict.put(state.refs, ref, name)
:ets.insert(state.names, {name, pid})
GenEvent.sync_notify(state.events, {:create, name, pid})
{:reply, pid, %{state | refs: refs}} # Reply with pid
end
end
We simply changed the callback from handle_cast/2
to handle_call/3
and changed it to reply with the pid of the created bucket.
Let's run the tests once again. This time though, we will pass the --trace
option:
$ mix test --trace
The --trace
option is useful when your tests are deadlocking or there are race conditions, as it runs all tests synchronously (async: true
has no effect) and shows detailed information about each test. This time we should be down to one failure (that may be intermittent):
1) test removes buckets on exit (KV.RegistryTest)
test/kv/registry_test.exs:48
Assertion with == failed
code: KV.Registry.lookup(ets, "shopping") == :error
lhs: {:ok, #PID<0.103.0>}
rhs: :error
stacktrace:
test/kv/registry_test.exs:52
According to the failure message, we are expecting that the bucket no longer exists on the table, but it still does! This problem is the opposite of the one we have just solved: while previously there was a delay between the command to create a bucket and updating the table, now there is a delay between the bucket process dying and its entry being removed from the table.
Unfortunately this time we cannot simply change handle_info/2
to a synchronous operation. We can, however, fix our tests by using event manager notifications. Let's take another look at our handle_info/2
implementation:
def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
# 5. Delete from the ETS table instead of the HashDict
{name, refs} = HashDict.pop(state.refs, ref)
:ets.delete(state.names, name)
GenEvent.sync_notify(state.event, {:exit, name, pid})
{:noreply, %{state | refs: refs}}
end
Notice that we are deleting from the ETS table before we send the notification. This is by design! This means that when we receive the {:exit, name, pid}
notification, the table will already be up to date. Let's update the remaining failing test as follows:
test "removes buckets on exit", %{registry: registry, ets: ets} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(ets, "shopping")
Agent.stop(bucket)
assert_receive {:exit, "shopping", ^bucket} # Wait for event
assert KV.Registry.lookup(ets, "shopping") == :error
end
We have simply amended the test to guarantee we first receive the {:exit, name, pid}
message before invoking KV.Registry.lookup/2
.
It is important to observe that we were able to keep our suite passing without a need to use :timer.sleep/1
or other tricks. Most of the time, we can rely on events, monitoring and messages to assert the system is in an expected state before performing assertions.
For your convenience, here is the fully passing test case:
defmodule KV.RegistryTest do
use ExUnit.Case, async: true
defmodule Forwarder do
use GenEvent
def handle_event(event, parent) do
send parent, event
{:ok, parent}
end
end
setup do
{:ok, sup} = KV.Bucket.Supervisor.start_link
{:ok, manager} = GenEvent.start_link
{:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup)
GenEvent.add_mon_handler(manager, Forwarder, self())
{:ok, registry: registry, ets: :registry_table}
end
test "sends events on create and crash", %{registry: registry, ets: ets} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(ets, "shopping")
assert_receive {:create, "shopping", ^bucket}
Agent.stop(bucket)
assert_receive {:exit, "shopping", ^bucket}
end
test "spawns buckets", %{registry: registry, ets: ets} do
assert KV.Registry.lookup(ets, "shopping") == :error
KV.Registry.create(registry, "shopping")
assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
KV.Bucket.put(bucket, "milk", 1)
assert KV.Bucket.get(bucket, "milk") == 1
end
test "removes buckets on exit", %{registry: registry, ets: ets} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(ets, "shopping")
Agent.stop(bucket)
assert_receive {:exit, "shopping", ^bucket} # Wait for event
assert KV.Registry.lookup(ets, "shopping") == :error
end
test "removes bucket on crash", %{registry: registry, ets: ets} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(ets, "shopping")
# Kill the bucket and wait for the notification
Process.exit(bucket, :shutdown)
assert_receive {:exit, "shopping", ^bucket}
assert KV.Registry.lookup(ets, "shopping") == :error
end
end
With tests passing, we just need to update the supervisor init/1
callback at lib/kv/supervisor.ex
to pass the ETS table name as an argument to the registry worker:
@manager_name KV.EventManager
@registry_name KV.Registry
@ets_registry_name KV.Registry
@bucket_sup_name KV.Bucket.Supervisor
def init(:ok) do
children = [
worker(GenEvent, [[name: @manager_name]]),
supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
worker(KV.Registry, [@ets_registry_name, @manager_name,
@bucket_sup_name, [name: @registry_name]])
]
supervise(children, strategy: :one_for_one)
end
Note that we are using KV.Registry
as name for the ETS table as well, which makes it convenient to debug, as it points to the module using it. ETS names and process names are stored in different registries, so there is no chance of conflicts.
6.3 ETS as persistent storage
So far we have created an ETS table during the registry initialization but we haven't bothered to close the table on registry termination. That's because the ETS table is "linked" (in a figure of speech) to the process that creates it. If that process dies, the table is automatically closed.
This is extremely convenient as a default behaviour, and we can use it even more to our advantage. Remember that there is a dependency between the registry and the buckets supervisor. If the registry dies, we want the buckets supervisor to die too, because once the registry dies all information linking the bucket name to the bucket process is lost. However, what if we could keep the registry data even if the registry process crashes? If we are able to do so, we remove the dependency between the registry and the buckets supervisor, making the :one_for_one
strategy the perfect strategy for our supevisor.
A couple of changes will be required in order to make this happen. First, we'll need to start the ETS table inside the supervisor. Second, we'll need to change the table's access type from :protected
to :public
, because the owner is the supervisor, but the process doing the writes is still the manager.
Let's get started by first changing KV.Supervisor
's init/1
callback:
def init(:ok) do
ets = :ets.new(@ets_registry_name,
[:set, :public, :named_table, {:read_concurrency, true}])
children = [
worker(GenEvent, [[name: @manager_name]]),
supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
worker(KV.Registry, [ets, @manager_name,
@bucket_sup_name, [name: @registry_name]])
]
supervise(children, strategy: :one_for_one)
end
Next, we change KV.Registry
's init/1
callback, as it no longer needs to create a table. It should instead just use the one given as an argument:
def init({table, events, buckets}) do
refs = HashDict.new
{:ok, %{names: table, refs: refs, events: events, buckets: buckets}}
end
Finally, we just need to change the setup
callback in test/kv/registry_test.exs
to explicitly create the ETS table. We will use this opportunity to also split the setup
functionality into a private function that will be handy soon:
setup do
ets = :ets.new(:registry_table, [:set, :public])
registry = start_registry(ets)
{:ok, registry: registry, ets: ets}
end
defp start_registry(ets) do
{:ok, sup} = KV.Bucket.Supervisor.start_link
{:ok, manager} = GenEvent.start_link
{:ok, registry} = KV.Registry.start_link(ets, manager, sup)
GenEvent.add_mon_handler(manager, Forwarder, self())
registry
end
After those changes, our test suite should continue to be green!
There is just one last scenario to consider: once we receive the ETS table, there may be existing bucket pids on the table. After all, that's the whole purpose of this change! However, the newly started registry is not monitoring those buckets, as they were created as part of previous, now defunct, registry. This means that the table may go stale, because we won't remove those buckets if they die.
Let's add a test to test/kv/registry_test.exs
that shows this bug:
test "monitors existing entries", %{registry: registry, ets: ets} do
bucket = KV.Registry.create(registry, "shopping")
# Kill the registry. We unlink first, otherwise it will kill the test
Process.unlink(registry)
Process.exit(registry, :shutdown)
# Start a new registry with the existing table and access the bucket
start_registry(ets)
assert KV.Registry.lookup(ets, "shopping") == {:ok, bucket}
# Once the bucket dies, we should receive notifications
Process.exit(bucket, :shutdown)
assert_receive {:exit, "shopping", ^bucket}
assert KV.Registry.lookup(ets, "shopping") == :error
end
Run the new test and it will fail with:
1) test monitors existing entries (KV.RegistryTest)
test/kv/registry_test.exs:72
No message matching {:exit, "shopping", ^bucket}
stacktrace:
test/kv/registry_test.exs:85
That's what we expected. If the bucket is not being monitored, the registry is not notified when it dies and therefore no event is sent. We can fix this by changing KV.Registry
's init/1
callback one last time to setup monitors for all existing entries in the table:
def init({table, events, buckets}) do
refs = :ets.foldl(fn {name, pid}, acc ->
HashDict.put(acc, Process.monitor(pid), name)
end, HashDict.new, table)
{:ok, %{names: table, refs: refs, events: events, buckets: buckets}}
end
We use :ets.foldl/3
to go through all entries in the table, similar to Enum.reduce/3
, invoking the given function for each element in the table with the given accumulator. In the function callback, we monitor each pid in the table and update the refs dictionary accordingly. If any of the entries is already dead, we will still receive the :DOWN
message, causing them to be purged later.
In this chapter we were able to make our application more robust by using an ETS table that is owned by the supervisor and passed to the registry. We have also explored how to use ETS as a cache and discussed some of the race conditions we may run into as data becomes shared between the server and all clients.