8 Task and gen_tcp

In this chapter, we are going to learn how to use Erlang's :gen_tcp module to serve requests. In future chapters we will expand our server so it can actually serve the commands. This will also provide a great opportunity to explore Elixir's Task module.

8.1 Echo server

We will start our TCP server by first implementing an echo server. It will simply send a response with the text it received in the request. We will slowly improve our server until it is supervised and ready to handle multiple connections.

A TCP server, in broad strokes, performs the following steps:

  1. Listens to a port until the port is available and it gets hold of the socket
  2. Waits for a client connection on that port and accepts it
  3. Reads the client request and writes a response back

Let's implement those steps. Move to the apps/kv_server application, open up lib/kv_server.ex, and add the following functions:

def accept(port) do
  # The options below mean:
  #
  # 1. `:binary` - receives data as binaries (instead of lists)
  # 2. `packet: :line` - receives data line by line
  # 3. `active: false` - block on `:gen_tcp.recv/2` until data is available
  #
  {:ok, socket} = :gen_tcp.listen(port,
                    [:binary, packet: :line, active: false])
  IO.puts "Accepting connections on port #{port}"
  loop_acceptor(socket)
end

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

defp serve(client) do
  client
  |> read_line()
  |> write_line(client)

  serve(client)
end

defp read_line(socket) do
  {:ok, data} = :gen_tcp.recv(socket, 0)
  data
end

defp write_line(line, socket) do
  :gen_tcp.send(socket, line)
end

We are going to start our server by calling KVServer.accept(4040), where 4040 is the port. The first step in accept/1 is to listen to the port until the socket becomes available and then call loop_acceptor/1. loop_acceptor/1 is just a loop accepting client connections. For each accepted connection, we call serve/1.

serve/1 is another loop that reads a line from the socket and writes those lines back to the socket. Note that the serve/1 function uses the pipeline operator |> to express this flow of operations. The pipeline operator evaluates the left side and passes its result as first argument to the function on the right side. The example above:

socket |> read_line() |> write_line(socket)

is equivalent to:

write_line(read_line(socket), socket)

When using the |> operator, it is important to add parentheses to the function calls due to how operator precedence works. In particular, this code:

1..10 |> Enum.filter &(&1 <= 5) |> Enum.map &(&1 * 2)

Actually translates to:

1..10 |> Enum.filter(&(&1 <= 5) |> Enum.map(&(&1 * 2)))

Which is not what we want, since the function given to Enum.filter/2 is the one passed as first argument to Enum.map/2. The solution is to use explicit parentheses:

1..10 |> Enum.filter(&(&1 <= 5)) |> Enum.map(&(&1 * 2))

The read_line/1 implementation receives data from the socket using :gen_tcp.recv/2 and write_line/2 writes to the socket using :gen_tcp.send/2.

This is pretty much all we need to implement our echo server. Let's give it a try!

Start an iex session inside the kv_server application with iex -S mix. Inside IEx, run:

iex> KVServer.accept(4040)

The server is now running, and you will even notice the console is blocked. Let's use a telnet client to access our server. There are clients available on most operating systems, and their command lines are generally similar:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?

Type "hello", press enter, and you will get "hello" back. Excellent!

My particular telnet client can be exited by typing ctrl + ], typing quit, and pressing <Enter>, but your client may require different steps.

Once you exit the telnet client, you will likely see an error in the IEx session:

** (MatchError) no match of right hand side value: {:error, :closed}
    (kv_server) lib/kv_server.ex:41: KVServer.read_line/1
    (kv_server) lib/kv_server.ex:33: KVServer.serve/1
    (kv_server) lib/kv_server.ex:27: KVServer.loop_acceptor/1

That's because we were expecting data from :gen_tcp.recv/2 but the client closed the connection. We need to handle such cases better in future revisions of our server.

For now there is a more important bug we need to fix: what happens if our TCP acceptor crashes? Since there is no supervision, the server dies and we won't be able to serve more requests, because it won't be restarted. That's why we must move our server inside a supervision tree.

8.2 Tasks

We have learned about agents, generic servers, and event managers. They are all meant to work with multiple messages or manage state. But what do we use when we only need to execute some task and that is it?

The Task module provides this functionality exactly. For example, it has start_link/3 function that receives a module, function and arguments, allowing us to run a given function as part of a supervision tree.

Let's give it a try. Open up lib/kv_server.ex, and let's change the supervisor in the start/2 function to the following:

def start(_type, _args) do
  import Supervisor.Spec

  children = [
    worker(Task, [KVServer, :accept, [4040]])
  ]

  opts = [strategy: :one_for_one, name: KVServer.Supervisor]
  Supervisor.start_link(children, opts)
end

With this change, we are saying that we want to run KVServer.accept(4040) as a worker. We are hardcoding the port for now, but we will discuss ways in which this could be changed later.

Now that the server is part of the supervision tree, it should start automatically when we run the application. Type mix run --no-halt in the terminal, and once again use the telnet client to make sure that everything still works:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me

Yes, it works! If you kill the client, causing the whole server to crash, you will see another one starts right away. However, does it scale?

Try to connect two telnet clients at the same time. When you do so, you will notice that the second client doesn't echo:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?

It doesn't seem to work at all. That's because we are serving requests in the same process that are accepting connections. When one client is connected, we can't accept another client.

8.3 Task supervisor

In order to make our server handle simultaneous connections, we need to have one process working as an acceptor that spawns other processes to serve requests. One solution would be to change:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

to use Task.start_link/1, which is similar to Task.start_link/3, but it receives an anonymous function instead of module, function and arguments:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  Task.start_link(fn -> serve(client) end)
  loop_acceptor(socket)
end

But we've already made this mistake once. Do you remember?

This is similar to the mistake we made when we called KV.Bucket.start_link/0 from the registry. That meant a failure in any bucket would bring the whole registry down.

The code above would have the same flaw: if we link the serve(client) task to the acceptor, a crash when serving a request would bring the acceptor, and consequently all other connections, down.

We fixed the issue for the registry by using a simple one for one supervisor. We are going to use the same tactic here, except that this pattern is so common with tasks that tasks already come with a solution: a simple one for one supervisor with temporary workers that we can just use in our supervision tree!

Let's change start/2 once again, to add a supervisor to our tree:

def start(_type, _args) do
  import Supervisor.Spec

  children = [
    supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
    worker(Task, [KVServer, :accept, [4040]])
  ]

  opts = [strategy: :one_for_one, name: KVServer.Supervisor]
  Supervisor.start_link(children, opts)
end

We simply start a Task.Supervisor process with name KVServer.TaskSupervisor. Remember, since the acceptor task depends on this supervisor, the supervisor must be started first.

Now we just need to change loop_acceptor/2 to use Task.Supervisor to serve each request:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
  loop_acceptor(socket)
end

Start a new server with mix run --no-halt and we can now open up many concurrent telnet clients. You will also notice that quitting a client does not bring the acceptor down. Excellent!

Here is the full echo server implementation, in a single module:

defmodule KVServer do
  use Application

  @doc false
  def start(_type, _args) do
    import Supervisor.Spec

    children = [
      supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
      worker(Task, [KVServer, :accept, [4040]])
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

  @doc """
  Starts accepting connections on the given `port`.
  """
  def accept(port) do
    {:ok, socket} = :gen_tcp.listen(port,
                      [:binary, packet: :line, active: false])
    IO.puts "Accepting connections on port #{port}"
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

Since we have changed the supervisor specification, we need to ask: is our supervision strategy is still correct?

In this case, the answer is yes: if the acceptor crashes, there is no need to crash the existing connections. On the other hand, if the task supervisor crashes, there is no need to crash the acceptor too. This is a contrast to the registry, where we initially had to crash the supervisor every time the registry crashed, until we used ETS to persist state. However, tasks have no state and nothing will go stale if one of these processes dies.

In the next chapter we will start parsing the client requests and sending responses, finishing our server.