Post

Fizz Buzz Pro on Elixir

This post is inspired from the following StackExchange CodeGolf thread. Fizz Buzz is a very simple programming task, asked in software developer job interviews. A typical round of Fizz Buzz can be:

Write a program that prints the numbers from 1 to a very high number and for multiples of ‘3’ print “Fizz” instead of the number and for the multiples of ‘5’ print “Buzz”. The output must go on forever (or a very high astronomical number, at-least 2^58) and not halt / change prematurely.

A typical single-threaded implementation in C looks like the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <stdio.h>

int main() {
    for (int i = 1; i < 1000000000; i++) {
        if ((i % 3 == 0) && (i % 5 == 0)) {
            printf("FizzBuzz\n");
        } else if (i % 3 == 0) {
            printf("Fizz\n");
        } else if (i % 5 == 0) {
            printf("Buzz\n");
        } else {
            printf("%d\n", i);
        }
    }
}

Fizz Buzz on Elixir

Compiling the above simple implementation gives an IO throughput of 65-70 MiB/s on my 16-inch 2019 Intel i7 9th gen MacBook Pro. But the results don’t look so good in Elixir land. Here’s our code for instance:

1
2
3
4
5
6
7
8
9
10
11
defp reply(n) do
  case {rem(n, 3), rem(n, 5)} do
    {0, 0} -> "FizzBuzz\n"
    {0, _} -> "Fizz\n"
    {_, 0} -> "Buzz\n"
    {_, _} -> "#{n}\n"
  end
end

Stream.map(lower..upper, &reply/1)
|> Enum.into(IO.binstream)

We can find out the throughput in the following way:

1
2
3
4
5
6
# build in release mode
MIX_ENV=prod mix escript.build
# pv (short for pipe viewer) is available on Linux
# It can be installed using HomeBrew on macOS
# Available in cygwin on Windows
./fizzbuzz 1 1000000000 | pv > /dev/null

Porting the same code to Elixir gives an IO throughput of barely 2-3 MiB/s. Yikes!

And thus begins our quest to improve Fizzbuzz. Lets look at some possible improvements.

Reducing IO calls

If we look at the above program, we can see one obvious bottleneck, the print call. Here, we’re processing each number and then printing the result. Doing that a billion times in C is no problem, because despite being a slow call, we still get a decent throughput. However, the same cannot be said for Elixir, and as we’ve seen above, we get a very low IO throughput.

We can reduce the calls to IO by processing our input and dividing the results into large enough chunks (not too large or we’ll slow down the IO call, and not too small, or we’ll end up making too many IO calls). Here’s an example:

1
2
3
4
5
6
7
8
9
10
11
12
defp reply(n) do
  case {rem(n, 3), rem(n, 5)} do
    {0, 0} -> "FizzBuzz\n"
    {0, _} -> "Fizz\n"
    {_, 0} -> "Buzz\n"
    {_, _} -> "#{n}\n"
  end
end

Stream.map(lower..upper, &reply/1)
|> Stream.chunk_every(6000)
|> Enum.into(IO.binstream)

Just this small optimization of sending chunks of 6000 items instead of a single item increases our IO throughput to 18 MiB/s. Hurray!

Distributing work between different processes

An obvious speedup on today’s multi-core/multi-processor/multi-threaded systems would be to divide the work into several small chunks and process them separately. Then we can get back the results from each subtask and print them. For e.g. if the range to process is 1..1000, then we can make several sub-ranges out of it, like this one for instance has a chunk size of 100: [1..100, 101..200, 201..300 ... 801..900, 901..1000]. Here’s my way of doing this:

1
2
3
4
5
6
7
defp get_input_ranges(lower, upper, chunk_size) do
  if chunk_size >= 10 do
    if lower >= upper, do: [], else: [lower..min(lower+chunk_size, upper) | get_input_ranges(min(lower+chunk_size, upper) + 1, upper, chunk_size)]
  else
    [lower..upper]
  end
end

Calling this method like this gives us the above mentioned sub-ranges: get_input_ranges(1, 1000, 100). But what do about the chunk size? Too small, and we could spin up a lot of subtasks that do very little work. Too big, and we’ll have less subtasks consuming a huge amount of memory.

Choosing the right chunk size for each task to process

A huge amount of memory you say? Why!? Where did that come from? Single-threaded process didn’t need no memory, so why does the concurrent version need memory?

Here’s an illustration to simplify things:

Image1

In single-threaded execution like the C code above, the main thread does the computation and printing of results in a loop, and because C is compiled to machine code and directly accesses the system IO API, its blazingly fast. But as we’ve seen, this approach won’t work with a language like Elixir.

Image2 FIGURE - 2: Concurrent execution with no buffering Image3 FIGURE - 3: Concurrent execution with buffering

A task needs to wait and hold its resultant computations till someone takes their results and prints them. A task cannot print its results by itself, because that would mess up the order of Fizzbuzz output. So, we can’t have a task that takes in a huge input range, because that would create a huge output, which will increase our program’s memory usage.

There’s another issue with having a huge input range, and that is that such a task will spend a lot of time doing the computations and recording the results in a buffer. We cannot issue the print command until the tasks have completed their (huge) work, and therefore, we will not be fully utlising the available IO resources during that time.

Here’s the formula I came up with:

chunk_size = min(div(upper - lower, System.schedulers_online), 6000)

Chunk size is set at max 6000, and there are two benefits to this:

  • We get a good task workload, neither too big nor too small.
  • Its a good size for IO printing as well (as we saw above), meaning we’ll get a better throughput, since processing and sending 6000 numbers to IO.puts is better than processing and sending a single number for printing to IO.puts. Again, this is neither too big nor too small.

Using Task for dividing our work

Let’s write some code. For starters, here’s our Fizzbuzz core module:

1
2
3
4
5
6
7
defmodule Fizzbuzz do
  def fizzbuzz(enumerable) do
    Stream.map(enumerable, &reply/1)
    |> Stream.chunk_every(6000)
    |> Enum.into([])
  end
end

Here, we’ve written a fizzbuzz function that accepts any enumerable (which means it’ll work with a range). No matter how big the input it gets, it’ll process it and chunk it into groups of 6000 elements and return the chunked output as a list.

The reason we have that Stream.chunk_every(6000) line is because its expensive to concatenate lists normally in Elixir. But since we want to send our list to IO, we can use a nice feature called iolists. An iolist is a list that can only contain other iolists, strings or a byte. You can read in detail about them here.

Let’s write our driver code now:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
defmodule Fizzbuzz.Cli do
  def main([lower, upper]) do
    {lower, upper} = {String.to_integer(lower), String.to_integer(upper)}
    chunk_size = min(div(upper - lower, System.schedulers_online), 5000)
    input_enumerable = get_input_ranges(lower, upper, chunk_size)
    stream = Task.async_stream(input_enumerable,
              fn range -> Fizzbuzz.fizzbuzz_no_io(range) end, 
              timeout: :infinity)
    Enum.reduce(stream, :ok,
      fn {:ok, res}, _acc -> res |> Enum.into(IO.binstream) end
    )
  end

  def main(_), do: IO.puts("Usage: fizzbuzz 1 10000")
end

We’re using get_input_ranges to divide our input range into small sub-ranges that will be processed by different tasks. We’ll start a lot of tasks with our various sub-ranges, but we’ll await their results in sequential order (since we need to print results sequentially). Task.async_stream function easily wraps up this complexity for us, by producing a stream of tasks for us to process. We reduce this stream and process the output of each task one by one, and ultimately we’ll have printed our results.

On my system, the above approach gives an IO throughput of 40 MiB/s approx.

Task vs GenServer - Reduce data passing

One issue that there is with our current approach of using Task is that it processes the input, generates an output list and sends the results back to the caller process (the main Fizzbuzz.Cli module in our case), which further sends the resultant list to IO, using res |> Enum.into(IO.binstream). IO in elixir is itself implemented as a process, which means there’s a lot of passing around of huge data between different processes.

Can we do better? If the main process had a way to tell the worker processes to print their results at the appropriate time, we could reduce the number of inter-process data transfers from 2 to 1 (from worker process to IO process). There’s no way to tell a Task to do something after it has been created, but its possible to do this with a GenServer. In fact, Figure - 3 above illustrates this exact concept already.

So, we change our approach again. Let’s create a GenServer first:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
defmodule Fizzbuzz.Worker do
  use GenServer

  def init([range]) do
    send(self(), {:calculate, range})
    {:ok, []}
  end

  def handle_info({:calculate, range}, _state) do
    res = Fizzbuzz.fizzbuzz_no_io(range)
    {:noreply, res}
  end

  def handle_call(:print, _from, results) do
    results |> Enum.into(IO.binstream)
    {:reply, :ok, []}
  end
end

We’ll change our driver module’s main function to do some few things:

  • We’ll use Task.async_stream to spin up a Fizzbuzz.Worker per sub-range. As soon as GenServer.start_link is called, our Fizzbuzz.Worker begins computation in its own process.
  • For every GenServer in stream, we’ll call its print method when we want it to print its output.
  • Once a GenServer has printed its output, its not of use to us anymore. We’ll kill it to free up memory.
1
2
3
4
5
6
7
8
9
10
11
12
13
def main([lower, upper]) do
  {lower, upper} = {String.to_integer(lower), String.to_integer(upper)}
  chunk_size = min(div(upper - lower, System.schedulers_online), 6000)
  input_enumerable = get_input_ranges(lower, upper, chunk_size)
  Task.async_stream(input_enumerable, fn input -> elem(GenServer.start_link(Fizzbuzz.Worker, [input]), 1) end, timeout: :infinity)
  |> Stream.map(fn {:ok, res} -> res end)
  |> Stream.each(
    fn pid ->
      GenServer.call(pid, :print)
      Process.exit(pid, :kill)
    end)
  |> Stream.run()
end

This optimization increases our throughput to around 80 MiB/s. That’s a considerable improvement.

Optimizing Fizzbuzz.reply(n) function

Observe the following code:

1
2
3
4
5
6
7
8
defp reply(n) do
  case {rem(n, 3), rem(n, 5)} do
    {0, 0} -> "FizzBuzz\n"
    {0, _} -> "Fizz\n"
    {_, 0} -> "Buzz\n"
    {_, _} -> "#{n}\n"  # <-- This line can be optimized
  end
end

We’re using string interpolation here, but we don’t need the interpolated result at all, instead, we can do better using iolists, and thus we’ll be returning a list in its place. So, the above code can be changed to the following:

1
2
3
4
5
6
7
8
defp reply(n) do
  case {rem(n, 3), rem(n, 5)} do
    {0, 0} -> "FizzBuzz\n"
    {0, _} -> "Fizz\n"
    {_, 0} -> "Buzz\n"
    {_, _} -> [Integer.to_string(n), "\n"]
  end
end

This change may appear trivial, but since reply gets called a lot, the gains add up. Our throughput increases from around 80 MiB/s to 150 MiB/s.

Optimizing get_input_ranges

The problem statement states that the output must go on forever (or a very high astronomical number, at-least 2^58) and not halt / change prematurely.

So far, our program does very well for numbers upto 1,000,000,000, but if we try it with the range 1..288230376151711744 (2^58), it hangs up. The offender in this case is get_input_ranges function, because it currently returns a list of sub-ranges for the given input range. For 2^58, we’ll get a list of 48,038,396,025,285 (48 trillion) elements. Forget processing, we don’t even have enough RAM (more than 48,000 GB) to store this list even if every element took up just 1 byte. So, we need to do better.

How about changing input_enumerable from a list to a stream? That way, we won’t be taking up any memory, and we can get started with our work as soon as the first element of the stream is generated. Here’s the code to do that:

1
2
3
4
5
6
7
8
defp get_input_ranges2(lower, upper, chunk_size) do
  # Need to make this streamable
  if chunk_size >= 10 do
    ChunkRangeStream.create(lower..upper, chunk_size)
  else
    [lower..upper]
  end
end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
defmodule ChunkRangeStream do
  def create(range, chunk_size) do
    Stream.resource(fn -> initialize(range, chunk_size) end, &generate_next_value/1, &done/1)
  end

  defp initialize(range, chunk_size) do
    {range, chunk_size, range.first}
  end

  defp generate_next_value({range, chunk_size, lower}) do
    if lower < range.last do
      {[lower..min(lower+chunk_size, range.last)], {range, chunk_size, min(range.last, lower+chunk_size+1)}}
    else
      {:halt, {range, chunk_size, lower}}
    end
  end

  defp done(_) do
    nil
  end
end

That takes care of the hiccup.

Getting creative with mathematics

We can observe that Fizzbuzz gives a periodic output with a period of 15 (only the numbers change). So, if we want to generate 6000 numbers per task, we can do it as follows:

1
2
3
4
5
6
7
8
def fizzbuzz6k(range) do
  i = range.first - 1
      0..(400-1)
      |> Stream.map(fn j ->
        [[Integer.to_string(15 * j + 1 + i), "\n"], [Integer.to_string(15 * j + 2 + i), "\n"], "Fizz\n", [Integer.to_string(15 * j + 4 + i), "\n"], "Buzz\nFizz\n", [Integer.to_string(15 * j + 7 + i), "\n"], [Integer.to_string(15 * j + 8 + i), "\n"], "Fizz\nBuzz\n", [Integer.to_string(15 * j + 11 + i), "\n"], "Fizz\n", [Integer.to_string(15 * j + 13 + i), "\n"], [Integer.to_string(15 * j + 14 + i), "\n"], "FizzBuzz\n"]
      end)
      |> Enum.chunk_every(400)
end

The above code makes a few assumptions:

  • Our range is of the size 6000 (exactly).
  • The starting number of the range passed to this function is of the form 15k + 1 (where k is a non-negative integer).

So, for large inputs, we can employ fizzbuzz6k and we’ll need to trifurcate the input range as follows. We’ll explain this using the input range 3..100000:

  • A beginning range that goes upto the first multiple of 15 we find. In this example, the beginning range is 3..15.
  • A mid range that must be of a size that is multiple of 6000. For our case, it is: 16..96015.
  • The ending range is whatever that’s left: 96016..100000.

Since the beginning and ending ranges are relatively smaller, we can solve them in the main process itself. The mid range can be divided into subranges of size 6000 using the following stream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
defmodule Chunk6kStream do
  # Make sure that range has size of multiples of 6000 and range.first is divisible by 15
  def create(range) do
    Stream.resource(fn -> initialize(range) end, &generate_next_value/1, &done/1)
  end

  defp initialize(range) do
    {range, range.first, range.last}
  end

  defp generate_next_value({range, lower, upper}) when lower == upper + 1 do
    {:halt, {range, upper, upper}}
  end

  defp generate_next_value({range, lower, upper}) do
    {[lower..(lower + 5999)], {range, lower + 6000, upper}}
  end

  defp done(_) do
    nil
  end
end

We can accordingly modify our driver program. The new program is now giving a throughput of 175 MiB/s on my system.

Reducing message passing overhead

Currently, all the results of our computation are being stored in an IOList. When the print call is issued, we send this IO list to our IO process, which ultimately prints it to the console. But since this is message passing between 2 processes, the underlying BEAM VM must copy the entire IO list from GenServer’s memory to the IO process. Assuming we’re dealing with a billion numbers at least, that’s 166,666 duplications, which is a significant overhead to deal with.

Looking at Erlang’s efficiency guide here, we find that all data in messages sent between Erlang processes is copied, except for refc binaries and literals on the same Erlang node. A refc binary for our purpose means a string that is greater than 64 bytes.

So, if we were to convert our IO list to a binary before passing it to IO process, we can reduce the overhead of copying, provided, that the process of conversion from an IOList to a binary doesn’t impose a significant overhead.

Here’s a code sample for reference:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
defmodule Fizzbuzz.Worker do
  use GenServer

  def init([range]) do
    send(self(), {:calculate, range})
    {:ok, []}
  end

  def handle_info({:calculate, range}, _state) do
    res = Fizzbuzz.fizzbuzz_no_io(range)
    {:noreply, res |> :erlang.iolist_to_binary} # <-- changed the state here
  end

  def handle_call(:print, _from, results) do
    IO.binwrite(results) # <-- refc binary gets passed here.
    {:reply, :ok, []}
  end
end

The above change improves our throughput to around 230 MiB/s.

Furthermore, since IPC bottleneck is no longer significant, we could now process bigger ranges on each GenServer and thus pass bigger results to our IO process (a list of refc binaries if you would). Since we can now process more ranges in the same time, this should be an obvious throughput improvement. On my system, it gets past 300 MiB/s.

Reducing Standard I/O bottlenecks

Erlang (and by extension Elixir) has a deliberately constrained I/O, because it has been designed to work in a concurrent and distributed environment. Standard I/O calls are expected to be atomic, i.e. if two (or more) processes concurrently called IO.puts to print something on the console, then those calls would have to proceed in a serial order, that’s why Standard I/O is done in a separate process itself. This kind of handling is present in standard I/O calls for many of the programming languages out there, so its nothing new. However, erlang processes are also designed to run in a distributed environment (meaning they can run on separate computers), so if you’re running 2 or more instances of BEAM VM in a cluster, then all the IO calls made by each instance of BEAM are forwarded to a process known as group leader. So, Erlang I/O has some additional bells and whistles that forward Standard I/O calls to the leader process.

All the above bells and whistles are necessary when writing professional software, but they’re not necessary when we just need to print to stdout on a single machine and if we’re taking care of the ordering ourselves. So, we must not use standard I/O calls anymore for our purpose. We’ll use ports instead. Here’s some documentation on using ports in elixir: https://hexdocs.pm/elixir/1.12.3/Port.html.

Instead of, IO.binwrite(results), we’ll use the following:

1
2
3
4
5
6
7
port = :erlang.open_port({:fd, 0, 1}, [:binary, :out]) # To open the port (from main process)
send(port, {self(), {:connect, pid}}) # Here pid is the process id of the Fizzbuzz.Worker that is about to receive the print call (self() is our main process)

send(port, {self(), {:command, results}}) # print to port from Fizzbuzz.Worker
send(port, {self(), {:connect, pid}}) # hand back control of port to main process

send(port, {self(), :close}) # Close the port after the program is over

This improvement gets us to 550+ MiB/s.

At last we’ve removed the constraints due to IO and due to IPC, which means that the program is now only as good as the algorithm that’s spitting out the results.

We’re now 7x faster than the naive C version, and yet there are many new peaks to conquer still.

Going native

I tried porting the function to process our range of numbers to Rust and calling that as a NIF using Rustler. The throughput jumps to around 3 GiB/s on my system. That’s a 6x improvement by simply porting the performance critical code to a native language like Rust. I suspect we can do even better were we to use Rust I/O, but then the code would entirely be in Rust, which would negate the meaning of this lengthy post.

Drop me a mail if you think this can be improved further. Cheers!

Code is available on GitHub.

Credits

This post is licensed under CC BY 4.0 by the author.