2 Agent

この章ではKV.Bucketというモジュールを作ります。このモジュールはキーバリューの読込や変更に対して異なるプロセスで反応します。

In this chapter, we will create a module named KV.Bucket. This module will be responsible for storing our key-value entries in a way that allows reading and modification by different processes.

もしGetting Startedのガイドを読み飛ばしていたり、前に読んでから時間が経ってしまった場合、Processesの章を読み直すことをお勧めします。この章ではProcessesの知識が前提です。

If you have skipped the Getting Started guide or if you have read it long ago, be sure to re-read the chapter about プロセス. We will use it as starting point.

2.1 ステートの問題点 - The trouble with state

Elixirはイミュータブルな言語で、通常では共有させることができません。もしバケットを複数の場所で作成、保存、操作などを行う場合、Elixirではふたつの方法が用意されています。

Elixir is an immutable language where nothing is shared by default. If we want to create buckets, store and access them from multiple places, we have two main options in Elixir:

ここではまずETSというものがどういった違いについて触れる必要があるまでプロセスについて紹介します。プロセスの代わりにETSを用いるケースは滅多にありませんが、ElixirとOTPで抽象的な操作を行うことができます。

We have talked about processes, while ETS is something new that we will explore later in this guide. When it comes to processes though, we rarely hand-roll our own process, instead we use the abstractions available in Elixir and OTP:

  • エージェント - ステート周りのシンプルなラッパー
  • GenServer - プロセスをカプセル化したり、同期通信、非同期通信やコードの再読込をサポートするジェネリックなサーバー
  • GenEvent - イベントの共有化や複数のハンドラーを管理するためのジェネリックなイベント
  • タスク - プロセスを発行したり、後から簡単に呼び出すことができる非同期処理の単位

  • Agent - Simple wrappers around state

  • GenServer - "Generic servers" (processes) that encapsulate state, provide sync and async calls, support code reloading, and more

  • GenEvent - "Generic event" managers that allow publishing events to multiple handlers

  • Task - Asynchronous units of computation that allow spawning a process and easily retrieving its result at a later time

これらの抽象的な操作はすべて網羅していきます。これらがプロセスでVMの機能にあるsendreceivespawnlinkを包括していることを覚えていてください。

We will explore all of these abstractions in this guide. Keep in mind that they are all implemented on top of processes using the basic features provided by the VM, like send, receive, spawn and link.

2.2 エージェント - Agents

エージェントはステートにおけるシンプルなラッパーです。もし状態を保つためにプロセスを使うのであれば、エージェントがうまく解決してくれます。それでは早速プロジェクトのディレクトリでiexを起動させてください。

Agents are simple wrappers around state. If all you want from a process is to keep state, agents are a great fit. Let's start an iex session inside the project with:

$ iex -S mix

まずはエージェントの簡単な例で説明します。

And play a bit with agents:

iex> {:ok, agent} = Agent.start_link fn -> [] end
{:ok, #PID<0.57.0>}
iex> Agent.update(agent, fn list -> ["eggs"|list] end)
:ok
iex> Agent.get(agent, fn list -> list end)
["eggs"]
iex> Agent.stop(agent)
:ok

まずは空のリストを持ったステートのエージェントから始めます。次に、ステートを更新するコマンドを発して、新しいアイテムをリストの先頭に追加します。最後はリスト全体を検索します。エージェントが完了したらAgent.stop/1を呼んでプロセスを完全に終了させます。

We started an agent with an initial state of an empty list. Next, we issue a command to update the state, adding our new item to the head of the list. Finally, we retrieved the whole list. Once we are done with the agent, we can call Agent.stop/1 to terminate the agent process.

KV.Bucketはエージェントを使って用意するのですが、その前にいくつか最初のテストを用意します。test/kv/bucket_test.exsというファイル(.exsは拡張子)を用意してください。

Let's implement our KV.Bucket using agents. But before starting the implementation, let's first write some tests. Create a file at test/kv/bucket_test.exs (remember the .exs extension) with the following:

defmodule KV.BucketTest do
  use ExUnit.Case, async: true

  test "stores values by key" do
    {:ok, bucket} = KV.Bucket.start_link
    assert KV.Bucket.get(bucket, "milk") == nil

    KV.Bucket.put(bucket, "milk", 3)
    assert KV.Bucket.get(bucket, "milk") == 3
  end
end

最初のテストは簡潔です。まずはKV.Bucketを作り、そのアサーションは単純にget/2put/3が操作した結果を用意します。これらを止める方法は、テストそのもののプロセスはテストの終了に伴って自動で行われるためにわざわざ用意する必要がありません。

Our first test is straightforward. We start a new KV.Bucket and perform some get/2 and put/3 operations on it, asserting the result. We don't need to explicitly stop the agent because it is linked to the test process and the agent is shut down automatically once the test finishes.

もうひとつ注意すべき部分はExUnit.Caseasync: trueという設定をしなければならない点です。これはこのテストが他のテストケースも含めて一斉に実行されるという:asyncオプションです。これはCPUのコアの数だけテストケース全体を高速化させることができるので非常に便利です。ただし:asyncオプションを有効にしている間はグローバルな値を参照させたり、変更させることはできません。例えば、テストにファイルシステムを用いたり、プロセスを登録したり、データベースの操作などを行う場合はテストを行っている間は非同期させないようにしてください。

Also note that we passed the async: true option to ExUnit.Case. This option makes this test case run in parallel with other test cases that set up the :async option. This is extremely useful to speed up our test suite by using multiple cores in our machine. Note though the :async option must only be set if the test case does not rely or change any global value. For example, if the test requires writing to the filesystem, registering processes, accessing a database, you must not make it async to avoid race conditions in between tests.

非同期が必要か不要かどうかに拘らず、先ほど用意したばかりのテストは当然何も関数的な記述がなされていないために必ず失敗します。

Regardless of being async or not, our new test should obviously fail, as none of the functionality is implemented.

テストの失敗を修正するために、lib/kv/bucket.exというファイルを用意します。次のような例を写す前に自分でKV.Bucketを用意しても構いません。

In order to fix the failing test, let's create a file at lib/kv/bucket.ex with the contents below. Feel free to give a try at implementing the KV.Bucket module yourself using agents before peeking the implementation below.

defmodule KV.Bucket do
  @doc """
  新しいバケットを開始する。
  """
  def start_link do
    Agent.start_link(fn -> HashDict.new end)
  end

  @doc """
  バケットから値を取得する。
  """
  def get(bucket, key) do
    Agent.get(bucket, &HashDict.get(&1, key))
  end

  @doc """
  バケットから渡された値を表示する。
  """
  def put(bucket, key, value) do
    Agent.update(bucket, &HashDict.put(&1, key, value))
  end
end
defmodule KV.Bucket do
  @doc """
  Starts a new bucket.
  """
  def start_link do
    Agent.start_link(fn -> HashDict.new end)
  end

  @doc """
  Gets a value from the `bucket` by `key`.
  """
  def get(bucket, key) do
    Agent.get(bucket, &HashDict.get(&1, key))
  end

  @doc """
  Puts the `value` for the given `key` in the `bucket`.
  """
  def put(bucket, key, value) do
    Agent.update(bucket, &HashDict.put(&1, key, value))
  end
end

KV.Bucketモジュールが定義できればテストは成功します。この例ではMapの代わりにHashDictを使って状態を保存しています。現在のバージョンのElixirではマップに巨大なキーを持たせることが非効率だからです。

With the KV.Bucket module defined, our test should pass! Note that we are using a HashDict to store our state instead of a Map, because in the current version of Elixir maps are less efficient when holding a large number of keys.

2.3 ExUnitのコールバック - ExUnit callbacks

KV.Bucketの追加機能をもう少し掘り下げて、ExUnitのコールバックについて触れていきます。KV.Bucketのテストを毎回セットアップと終了後の処理に関する振る舞いもテストに含めなければならないと思われるかもしれませんが、ExUnitには反復的なタスクを省略させるためのコールバックが備わっています。

Before moving on and adding more features to KV.Bucket, let's talk about ExUnit callbacks. As you may expect, all KV.Bucket tests will require a bucket to be started during setup and stopped after the test. Luckily, ExUnit supports callbacks that allow us to skip such repetitive tasks.

テストケースをコールバックを用いて書き直してみましょう。

Let's rewrite the test case to use callbacks:

defmodule KV.BucketTest do
  use ExUnit.Case, async: true

  setup do
    {:ok, bucket} = KV.Bucket.start_link
    {:ok, bucket: bucket}
  end

  test "stores values by key", %{bucket: bucket} do
    assert KV.Bucket.get(bucket, "milk") == nil

    KV.Bucket.put(bucket, "milk", 3)
    assert KV.Bucket.get(bucket, "milk") == 3
  end
end

まず最初にセットアップに用いるsetup/1というコールバックを定義しました。setup/1はすべてのテストが始まる前に実行され、テストに毎回必要な処理を定義しておくことができます。

We have first defined a setup callback with the help of the setup/1 macro. The setup/1 callback runs before every test, in the same process as the test itself.

バケットをテストするにはpidを扱う必要があり、テストコンテクストと呼ばれる方法を使います。これは{:ok, bucket: bucket}がコールバックによって返されると、ExUnitはふたつめの要素のタプル(あるいは辞書)をテストコンテクストにまとめます。テストコンテクストはテストの定義やブロックの値を参照するための手段です。

Note that we need a mechanism to pass the bucket pid from the callback to the test. We do so by using the test context. When we return {:ok, bucket: bucket} from the callback, ExUnit will merge the second element of the tuple (a dictionary) into the test context. The test context is a map which we can then match in the test definition, providing access to these values inside the block:

test "stores values by key", %{bucket: bucket} do
  # バケットはセットアップのブロックから引き継がれます。
end
test "stores values by key", %{bucket: bucket} do
  # `bucket` is now the bucket from the setup block
end

ExUnitのケースについてはExUnit.Caseモジュールの ドキュメントExUnit.Callbacks のドキュメントを参照してください。

You can read more about ExUnit cases in the ExUnit.Case module documentation and more about callbacks in ExUnit.Callbacks docs.

2.4 その他のエージェント操作 - Other Agent actions

バケットの値を取得と更新を行うAgent.get_and_update/2という関数と、キーからバケットの消去を行うKV.Bucket.delete/2という関数をそれぞれ実装してみましょう。

Besides getting a value and updating the agent state, agents allow us to get a value and update the agent state in one function call via Agent.get_and_update/2. Let's implement a KV.Bucket.delete/2 function that deletes a key from the bucket, returning its current value:

@doc """
キーとバケットの消去。

キーが存在すれば、そのキーの値を返す
"""
def delete(bucket, key) do
  Agent.get_and_update(bucket, &HashDict.pop(&1, key))
end
@doc """
Deletes `key` from `bucket`.

Returns the current value of `key`, if `key` exists.
"""
def delete(bucket, key) do
  Agent.get_and_update(bucket, &HashDict.pop(&1, key))
end

それではこの関数を満たすテストケースを書いてみましょう。もちろん、エージェントに関する詳しいドキュメントを読んでみてもよいでしょう。

Now it is your turn to write a test for the functionality above! Also, be sure to explore the documentation for Agents to learn more about them.

2.5 クライアントとサーバーのエージェント - Client/Server in Agents

次の章へ移る前に、クライアントとサーバーの二分法について触れておきましょう。先ほどのdelete/2を拡張します。

Before we move on to the next chapter, let's discuss the client/server dichotomy in agents. Let's expand the delete/2 function we have just implemented:

def delete(bucket, key) do
  Agent.get_and_update(bucket, fn dict->
    HashDict.pop(dict, key)
  end)
end

関数に含まれているものすべてがエージェントのプロセスで実行されます。今回はエージェントのプロセスがクライアントのリクエストを処理します。エージェントのプロセスをサーバーとして、サーバーの外側で発せられるものすべてをクライアントとして説明します。

Everything that is inside the function we passed to the agent happens in the agent process. In this case, since the agent process is the one receiving and responding to our messages, we say the agent process is the server. Everything outside the function is happening in the client.

この区別は重要です。もし、一度の処理に必要な手続きが多いときはサーバーとクライアントのどちらで処理をさせれば負荷を軽減させるか考えなければなりません。例えば、

This distinction is important. If there are expensive actions to be done, you must consider if it will be better to perform these actions on the client or on the server. For example:

def delete(bucket, key) do
  :timer.sleep(1000) # クライアントを待機させる
  Agent.get_and_update(bucket, fn dict ->
    :timer.sleep(1000) # サーバーを待機させる
    HashDict.pop(dict, key)
  end)
end
def delete(bucket, key) do
  :timer.sleep(1000) # sleeps the client
  Agent.get_and_update(bucket, fn dict ->
    :timer.sleep(1000) # sleeps the server
    HashDict.pop(dict, key)
  end)
end

サーバー側に膨大な処理をさせている場合、すべてのアクションが完了するまでサーバーはずっと待機状態にさせなければならず、その結果クライアントに対してタイムアウトが発生させてしまうかもしれません。

When a long action is performed on the server, all other requests to that particular server will wait until the action is done, which may cause some clients to timeout.

次の章ではGenServersについて、クライアントとサーバーのはっきりとした違いについて説明していきます。

In the next chapter we will explore GenServers, where the segregation between clients and servers is made even more apparent.