In this article you will learn the basics of concurrency in Elixir and see how to spawn processes, send and receive messages, and create long-running processes. Also you will learn about GenServer, see how it can be used in your application, and discover some goodies it provides for you.
As you probably know, Elixir is a functional language used to build fault-tolerant, concurrent systems that handle lots of simultaneous requests. BEAM (Erlang virtual machine) uses processes to perform various tasks concurrently, which means, for example, that serving one request does not block another one. Processes are lightweight and isolated, which means that they do not share any memory and even if one process crashes, others can continue running.
BEAM processes are very different from the OS processes. Basically, BEAM runs in one OS process and uses its own schedulers. Each scheduler occupies one CPU core, runs in a separate thread, and may handle thousands of processes simultaneously (that take turns to execute). You can read a bit more about BEAM and multithreading on StackOverflow.
So, as you see, BEAM processes (I will say just "processes" from now on) are very important in Elixir. The language does provide you some low-level tools to manually spawn processes, maintain the state, and handle the requests. However, few people use them—it's more common to rely on the Open Telecom Platform (OTP) framework to do that.
OTP nowadays has nothing to do with telephones—it is a general-purpose framework to build complex concurrent systems. It defines how your applications should be structured and provides a database as well as a bunch of very useful tools to create server processes, recover from errors, perform logging, etc. In this article, we will talk about a server behavior called GenServer that is provided by OTP.
You can think of GenServer as an abstraction or a helper that simplifies working with server processes. Firstly, you will see how to spawn processes using some low-level functions. Then we will switch to GenServer and see how it simplifies things for us by removing the need to write tedious (and pretty generic) code every time. Let's get started!
It All Starts With Spawn
If you asked me how to create a process in Elixir, I'd answer: spawn it! spawn/1 is a function defined inside the Kernel module that returns a new process. This function accepts a lambda that will be executed in the created process. As soon as the execution has finished, the process exits as well:
spawn(fn -> IO.puts("hi") end) |> IO.inspect
# => hi
# => #PID<0.72.0>
So, here spawn returned a new process id. If you add a delay to the lambda, the string "hi" will be printed out after some time:
spawn(fn ->
  :timer.sleep(5000)
  IO.puts("hi")
end) |> IO.inspect
# => #PID<0.82.0>
# => (after 5 seconds) "hi"
Now we can spawn as many processes as we want, and they will be run concurrently:
spawn_it = fn(num) ->
  spawn(fn ->
    :timer.sleep(5000)
    IO.puts("hi #{num}")
  end)
end
Enum.each(
  1..10,
  fn(_) -> spawn_it.(:rand.uniform(100)) end
)
# => (all printed out at the same time, after 5 seconds)
# => hi 5
# => hi 10 etc...
Here we are spawning ten processes and printing out a test string with a random number. :rand is a module provided by Erlang, so its name is an atom. What's cool is that all the messages will be printed out at the same time, after five seconds. It happens because all ten processes are being executed concurrently.
Compare it to the following example that performs the same task but without using spawn/1:
dont_spawn_it = fn(num) ->
  :timer.sleep(5000)
  IO.puts("hi #{num}")
end
Enum.each(
  1..10,
  fn(_) -> dont_spawn_it.(:rand.uniform(100)) end
)
# => (after 5 seconds) hi 70
# => (after another 5 seconds) hi 45
# => etc...
While this code is running, you may go to the kitchen and make another cup of coffee as it will take nearly a minute to complete. Each message is displayed sequentially, which is of course not optimal!
You might ask: "How much memory does a process consume?" Well, it depends, but initially it occupies a couple of kilobytes, which is a very small number (even my old laptop has 8GB of memory, not to mention cool modern servers).
So far, so good. Before we start working with GenServer, however, let's discuss yet another important thing: passing and receiving messages.
Working With Messages
It's no surprise that processes (which are isolated, as you remember) need to communicate in some way, especially when it comes to building more or less complex systems. To achieve this, we can use messages.
A message can be sent using a function with quite an obvious name: send/2. It accepts a destination (port, process id or a process name) and the actual message. After the message is sent, it appears in the mailbox of a process and can be processed. As you see, the general idea is very similar to our everyday activity of exchanging emails.
A mailbox is basically a "first in first out" (FIFO) queue. After the message is processed, it is removed from the queue. To start receiving messages, you need—guess what!—a receive macro. This macro contains one or more clauses, and a message is matched against them. If a match is found, the message is processed. Otherwise, the message is put back into the mailbox. On top of that, you can set an optional after clause that runs if a message was not received in the given time. You can read more about send/2 and receive in the official docs.
Okay, enough with the theory—let's try to work with the messages. First of all, send something to the current process:
send(self(), "hello!")
The self/0 macro returns a pid of the calling process, which is exactly what we need. Do not omit round brackets after the function as you'll get a warning regarding the ambiguity match.
Now receive the message while setting the after clause:
receive do
  msg ->
    IO.puts "Yay, a message: #{msg}"
    msg
  after 1000 -> IO.puts :stderr, "I want messages!"
end |> IO.puts
# => Yay, a message: hello!
# => hello!
Note that the clause returns the result of evaluating the last line, so we get the "hello!" string.
Remember that you may introduce as many clauses as needed:
send(self(), {:ok, "hello!"})
receive do
  {:ok, msg} ->
    IO.puts "Yay, a message: #{msg}"
    msg
  {:error, msg} -> IO.puts :stderr, "Oh no, something bad has happened: #{msg}"
  _ -> IO.puts "I dunno what this message is..."
  after 1000 -> IO.puts :stderr, "I want messages!"
end |> IO.puts
Here we have four clauses: one to handle a success message, another to handle errors, and then a "fallback" clause and a timeout.
If the message does not match any of the clauses, it is kept in the mailbox, which is not always desirable. Why? Because whenever a new message arrives, the old ones are being processed in the first head (because the mailbox is a FIFO queue), slowing the program down. Therefore a "fallback" clause may come in handy.
Now that you know how to spawn processes, send and receive messages, let's take a look at a slightly more complex example that involves creating a simple server responding to various messages.
Working With Server Process
In the previous example, we sent only one message, received it, and performed some work. That's fine, but not very functional. Usually what happens is we have a server that can respond to various messages. By "server" I mean a long-running process built with a recurring function. For instance, let's create a server to perform some mathematical equations. It is going to receive a message containing the requested operation and some arguments.
Start by creating the server and the looping function:
defmodule MathServer do
  def start do
    spawn &listen/0
  end
  defp listen do
    receive do
      {:sqrt, caller, arg} -> IO.puts arg
      _ -> IO.puts :stderr, "Not implemented."
    end
    listen()
  end
end
So we spawn a process that keeps listening to the incoming messages. After the message is received, the listen/0 function gets called again, thus creating an endless loop. Inside the listen/0 function, we add support for the :sqrt message, which will calculate the square root of a number. The arg will contain the actual number to perform the operation against. Also, we are defining a fallback clause.
You may now start the server and assign its process id to a variable:
math_server = MathServer.start IO.inspect math_server # => #PID<0.85.0>
Brilliant! Now let's add an implementation function to actually perform the calculation:
defmodule MathServer do
    # ...
    def sqrt(server, arg) do
        send(:some_name, {:sqrt, self(), arg})
    end
end
Use this function now:
MathServer.sqrt(math_server, 3) # => 3
For now, it simply prints out the passed argument, so tweak your code like this to perform the mathematical operation:
defmodule MathServer do
  # ...    
  defp listen do
    receive do
      {:sqrt, caller, arg} -> send(:some_name, {:result, do_sqrt(arg)})
      _ -> IO.puts :stderr, "Not implemented."
    end
    listen()
  end
  defp do_sqrt(arg) do
    :math.sqrt(arg)
  end
end
Now, yet another message is sent to the server containing the result of the computation.
What's interesting is that the sqrt/2 function simply sends a message to the server asking to perform an operation without waiting for the result. So, basically, it performs an asynchronous call.
Obviously, we do want to grab the result at some point in time, so code another public function:
def grab_result do
    receive do
      {:result, result} -> result
      after 5000 -> IO.puts :stderr, "Timeout"
    end
end
Now utilize it:
math_server = MathServer.start MathServer.sqrt(math_server, 3) MathServer.grab_result |> IO.puts # => 1.7320508075688772
It works! Of course, you can even create a pool of servers and distribute tasks between them, achieving concurrency. It is convenient when the requests do not relate to each other.
Meet GenServer
All right, we have covered a handful of functions allowing us to create long-running server processes and send and receive messages. This is great, but we have to write too much boilerplate code that starts a server loop (start/0), responds to messages (listen/0 private function), and returns a result (grab_result/0). In more complex situations, we might also need to main a shared state or handle the errors.
As I said at the beginning of the article, there is no need to reinvent a bicycle. Instead, we can utilize GenServer behavior that already provides all the boilerplate code for us and has great support for server processes (as we saw in the previous section).
Behaviour in Elixir is a code that implements a common pattern. To use GenServer, you need to define a special callback module that satisfies the contract as dictated by the behaviour. Specifically, it should implement some callback functions, and the actual implementation is up to you. After the callbacks are written, the behavior module may utilize them.
As stated by the docs, GenServer requires six callbacks to be implemented, though they have a default implementation as well. It means that you can redefine only those that require some custom logic.
First things first: we need to start the server before doing anything else, so proceed to the next section!
Starting the Server
To demonstrate the usage of GenServer, let's write a CalcServer that will allow users to apply various operations to an argument. The result of the operation will be stored in a server state, and then another operation may be applied to it as well. Or a user may get a final result of the computations.
First of all, employ the use macro to plug in GenServer:
defmodule CalcServer do use GenServer end
Now we will need to redefine some callbacks.
The first is init/1, which is invoked when a server is started. The passed argument is used to set an initial server's state. In the simplest case, this callback should return the {:ok, initial_state} tuple, though there are other possible return values like {:stop, reason}, which causes the server to immediately stop.
I think we can allow users to define the initial state for our server. However, we must check that the passed argument is a number. So use a guard clause for that:
defmodule CalcServer do
  use GenServer
  def init(initial_value) when is_number(initial_value) do
    {:ok, initial_value}
  end
  def init(_) do
    {:stop, "The value must be an integer!"}
  end
end
Now, simply start the server by using the start/3 function, and provide your CalcServer as a callback module (the first argument). The second argument will be the initial state:
GenServer.start(CalcServer, 5.1) |> IO.inspect 
# => {:ok, #PID<0.85.0>}
If you try to pass a non-number as a second argument, the server won't be started, which is exactly what we need.
Great! Now that our server is running, we can start coding mathematical operations.
Handling Asynchronous Requests
Asynchronous requests are called casts in GenServer's terms. To perform such a request, use the cast/2 function, which accepts a server and the actual request. It is similar to the sqrt/2 function that we coded when talking about server processes. It also uses the "fire and forget" approach, meaning that we are not waiting for the request to finish.
To handle the asynchronous messages, a handle_cast/2 callback is used. It accepts a request and a state and should respond with a tuple {:noreply, new_state} in the simplest case (or {:stop, reason, new_state} to stop the server loop). For instance, let's handle an asynchronous :sqrt cast:
def handle_cast(:sqrt, state) do
    {:noreply, :math.sqrt(state)}
end 
That's how we maintain the state of our server. Initially the number (passed when the server was started) was 5.1. Now we update the state and set it to :math.sqrt(5.1).
Code the interface function that utilizes cast/2:
def sqrt(pid) do
    GenServer.cast(pid, :sqrt)
end
To me, this resembles an evil wizard who casts a spell but doesn't care about the impact it causes.
Note that we require a process id to perform the cast. Remember that when a server is successfully started, a tuple {:ok, pid} is returned. Therefore, let's use pattern matching to extract the process id:
{:ok, pid} = GenServer.start(CalcServer, 5.1)
CalcServer.sqrt(pid)
Nice! The same approach can be used to implement, say, multiplication. The code will be a bit more complex as we'll need to pass the second argument, a multiplier:
def multiply(pid, multiplier) do
    GenServer.cast(pid, {:multiply, multiplier})
end
The cast function supports only two arguments, so I need to construct a tuple and pass an additional argument there.
Now the callback:
def handle_cast({:multiply, multiplier}, state) do
    {:noreply, state * multiplier}
end
We can also write a single handle_cast callback that supports operation as well as stopping the server if the operation is unknown:
def handle_cast(operation, state) do
    case operation do
      :sqrt -> {:noreply, :math.sqrt(state)}
      {:multiply, multiplier} -> {:noreply, state * multiplier}
      _ -> {:stop, "Not implemented", state}
    end
end
Now use the new interface function:
CalcServer.multiply(pid, 2)
Great, but currently there is no way to get a result of the computations. Therefore, it is time to define yet another callback.
Handling Synchronous Requests
If asynchronous requests are casts, then synchronous ones are named calls. To run such requests, utilize the call/3 function, which accepts a server, request, and an optional timeout which equals five seconds by default.
Synchronous requests are used when we want to wait until the response actually arrives from the server. The typical use case is getting some information like a result of computations, as in today's example (remember the grab_result/0 function from one of the previous sections).
To process synchronous requests, a handle_call/3 callback is utilized. It accepts a request, a tuple containing the server's pid, and a term identifying the call, as well as the current state. In the simplest case, it should respond with a tuple {:reply, reply, new_state}. 
Code this callback now:
def handle_call(:result, _, state) do
    {:reply, state, state}
end
As you see, nothing complex. The reply and the new state equal the current state as I don't want to change anything after the result was returned.
Now the interface result/1 function:
def result(pid) do
    GenServer.call(pid, :result)
end
This is it! The final usage of the CalcServer is demonstrated below:
{:ok, pid} = GenServer.start(CalcServer, 5.1)
CalcServer.sqrt(pid)
CalcServer.multiply(pid, 2)
CalcServer.result(pid) |> IO.puts
# => 4.516635916254486
Aliasing
It becomes somewhat tedious to always provide a process id when calling the interface functions. Luckily, it is possible to give your process a name, or an alias. This is done upon the starting of the server by setting name:
GenServer.start(CalcServer, 5.1, name: :calc) CalcServer.sqrt CalcServer.multiply(2) CalcServer.result |> IO.puts
Note that I am not storing pid now, though you may want to do pattern matching to make sure that the server was actually started.
Now the interface functions become a bit simpler:
def sqrt do
    GenServer.cast(:calc, :sqrt)
end
def multiply(multiplier) do
    GenServer.cast(:calc, {:multiply, multiplier})
end
def result do
    GenServer.call(:calc, :result)
end
Just don't forget that you can't start two servers with the same alias.
Alternatively, you may introduce yet another interface function start/1 inside your module and take advantage of the __MODULE__/0 macro, which returns the current module's name as an atom:
defmodule CalcServer do
  use GenServer
  def start(initial_value) do
    GenServer.start(CalcServer, initial_value, name: __MODULE__)
  end
  
  def sqrt do
    GenServer.cast(__MODULE__, :sqrt)
  end
  def multiply(multiplier) do
    GenServer.cast(__MODULE__, {:multiply, multiplier})
  end
  def result do
    GenServer.call(__MODULE__, :result)
  end
  # ...
end
CalcServer.start(6.1)
CalcServer.sqrt
CalcServer.multiply(2)
CalcServer.result |> IO.puts
Termination
Another callback that can be redefined in your module is called terminate/2. It accepts a reason and the current state, and it's called when a server is about to exit. This may happen when, for example, you pass an incorrect argument to the multiply/1 interface function:
# ... CalcServer.multiply(2)
The callback may look something like this:
def terminate(_reason, _state) do
    IO.puts "The server terminated"
end
Conclusion
In this article we have covered the basics of concurrency in Elixir and discussed functions and macros like spawn, receive, and send. You have learned what processes are, how to create them, and how to send and receive messages. Also, we've seen how to build a simple long-running server process that responds to both synchronous and asynchronous messages.
On top of that, we have discussed GenServer behavior and have seen how it simplifies the code by introducing various callbacks. We have worked with the init, terminate, handle_call and handle_cast callbacks and created a simple calculating server. If something seemed unclear to you, don't hesitate to post your questions!
There is more to GenServer, and of course it's impossible to cover everything in one article. In my next post, I will explain what supervisors are and how you can use them to monitor your processes and recover them from errors. Until then, happy coding!
 
No comments:
Post a Comment