Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Data Flows


Data Flows

  {:briefly, "~> 0.5.1"},
  {:httpoison, "~> 2.2"},
  {:flow, "~> 1.2"},
  {:vega_lite, "~> 0.1.9"},
  {:kino_vega_lite, "~> 0.1.10"}


In 2017, José Valim presented GenStage and Flow. The narrative of that presentation was really good, so this workbook will use that as a starting point. Stolen from José with pride.

This workbook makes use of the following modules:

  • File for reading and writing files.
  • Briefly for temporary files.
  • HTTPoison for performing HTTP GET operations.
  • Enum for eager processing of enumerables.
  • Stream for lazy processing using streams.
  • Flow for concurrent processing using GenStage processes.

In this workbook, we will explore different ways of counting the number of occurrences of each word present in a text. There are three principal steps in doing so:

  1. Read data from file.
  2. Segment it into words.
  3. Reduce the list of words into a map from word to count.


This is “Alice’s Adventures in Wonderland” by Lewis Carroll:

url = "https://www.gutenberg.org/cache/epub/11/pg11.txt"

Lets download it:

{:ok, path} = Briefly.create()
%HTTPoison.Response{body: contents} = HTTPoison.get!(url)
File.write!(path, contents)

Eager Processing

eager =
  |> String.split("\n")
  |> Enum.flat_map(&String.split/1)
  |> Enum.reduce(%{}, fn word, counts -> Map.update(counts, word, 1, &(&1 + 1)) end)

What’s going one here?

  • The File.read! line reads the contents of a file into memory or errors out.
  • The String.split line does a pass over this and splits it into a list of strings (one per line).
  • The Enum.flat_map line does another pass whereby it maps the String.split/1 function (which splits on whitespace) to each of these strings and flattens the result. The result is thus a list of strings representing each word in the original text. These words may contain periods, command and similar characters, but that is without relevance for this example.
  • The Enum.reduce line does yet another apss where it reduces this list to a single map. This map starts empty, and for each encountered word this is either incremented (if already present) or inserted with a value of 1.

While this solution is fine for a small file, it has a number of downsides, namely:

  • Memory consumption: It loads the entire file into memory. That may be fine for a 10kB file, but wouldf likely be problematic for a 10GB file.
  • Memory access: It does four passes over the data of the file; first for reading, then for splitting into lines, then for splitting into words and finally for reducing into the map.

This is the result of eager processing.

Lazy Processing

lazy =
  |> Stream.flat_map(&String.split/1)
  |> Enum.reduce(%{}, fn word, counts -> Map.update(counts, word, 1, &(&1 + 1)) end)

What’s going on here?

  • The File.stream! line works like File.read! except that instead of returning the contents of the file it returns a recipe for reading that contents.
  • The Stream.flat_map line works like Enum.flat_map except that:
    1. Instead of taking an enumerable as input it takes a recipe.
    2. Instead of returning a list of strings it returns a recipe for reading a sequence of strigs.
  • The Enum.reduce line is the same as the one from the eager version. Because it is eager, it will follow the recipe that it takes as first argument.

Enum.reduce will consume values (i.e., words), one at a time, produced by the recipe passed to it. In order to produce a value the recipe will follow the recipe passed to it. This way, recipes are chained, and the call follows this chain for each value produced.

Let’s try to illustrate this:

print = fn value ->

[1, 2, 3]
|> Stream.map(print)
|> Stream.map(print)
|> Enum.to_list()

We can also inspect the recipe itself:


Comparing this to the eager solution, we see that:

  • At no point do we load any significant amount of data into memory.
  • This comes at a bit of overhead, but this is robust to large files.
  • The lazy solution could deal with infinite streams (e.g., a random number generator).
  • We are still unable to utilize multiple cores.

Concurrent Processing

concurrent =
  |> Flow.from_enumerable()
  |> Flow.flat_map(&String.split/1)
  |> Flow.partition()
  |> Flow.reduce(fn -> %{} end, fn word, counts -> Map.update(counts, word, 1, &(&1 + 1)) end)
  |> Enum.into(%{})

What’s going on here?

  • The File.stream! line is exactly the same as in the lazy version.
  • The Flow.from_enumerate line freates a new flow from an enumerable. It will consume batches of elements from the previous line when it receives demand. File.stream!, by default, will send it one element per line it encounters.
  • The Flow.flat_map line is exactly the same as in the lazy version, only it produces a flow that can be evaluated concurrently.
  • The Flow.partition line will partition the data according to a keying function whose keys will be hashed to the number of stages in the next step of the flow. Partitioning allows you to control which classes of inputs are routed to the same stages of the next step. This is critical for some problems and a waste of resources for others.
  • The Flow.reduce line is exactly the same as in the lazy vesion, only it produces a flow and, given that it is positioned after the partioning, specific words will alway be mapped to a specific stage of those that make up this step of the flow. This means that we are guaranteed to not have any key that appears in two stages.
  • The Enum.into line eagerly constructs a map from the flow. This is what drives the demand.

Lets take a look at the recipe part of this (before it is consumed by Enum.into):

concurrent_recipe =
  |> Flow.from_enumerable()
  |> Flow.flat_map(&String.split/1)
  |> Flow.partition()
  |> Flow.reduce(fn -> %{} end, fn word, counts -> Map.update(counts, word, 1, &(&1 + 1)) end)

That structure can be used to vizualize the flow of data (left to right):

defmodule RecipeViz do
  @stepsize 300
  @y_offset 30
  @y_spacing 40

  def analyze(%File.Stream{}) do
    [%{name: "File Stream", count: 1}]

  def analyze(%Flow{} = flow) do
      operations: [operation],
      options: [stages: stagecount],
      producers: {producer, [input]},
      window: _window
    } = flow

    node_name =
      case operation do
        {a, b, _} when is_atom(a) and is_atom(b) ->
          "#{producer} #{a} #{b}"

        {a, _, _} when is_atom(a) ->
          "#{producer} #{a}"

    tail = analyze(input)
    [%{name: node_name, count: stagecount} | tail]

  def enrich(config) do
    maxcount = config |> Enum.map(fn %{count: count} -> count end) |> Enum.max()
    height = maxcount * @y_spacing

    new_config =
      |> Enum.with_index(fn %{count: count} = entry, i ->
        points =
          |> Enum.map(fn count_i ->
              (i + 0.5) * @stepsize,
              @y_offset + (0.5 + count_i) * height / (count + 2)

        Map.merge(entry, %{points: points})

    {new_config, height}

  def render(recipe) do
    {config, height} =
      |> analyze()
      |> Enum.reverse()
      |> enrich()

    lines_labels =
      |> Enum.with_index(fn %{name: name}, i ->

    lines_nodes =
      |> Enum.map(fn %{points: points} ->
        |> Enum.map(fn {x, y} ->
      |> List.flatten()

    lines_edges =
      |> Enum.chunk_every(2, 1, :discard)
      |> Enum.map(fn [%{points: src_points}, %{points: dst_points}] ->
        for {src_x, src_y} <- src_points, {dst_x, dst_y} <- dst_points do
      |> List.flatten()

    xmin = 0
    xmax = length(config) * @stepsize
    ymin = 0
    ymax = @y_offset + height

    #{lines_labels |> Enum.map(fn line -> "  #{line}\n" end) |> Enum.join()}
    #{lines_edges |> Enum.map(fn line -> "  #{line}\n" end) |> Enum.join()}
    #{lines_nodes |> Enum.map(fn line -> "  #{line}\n" end) |> Enum.join()}
    |> Kino.Image.new(:svg)

Note: Options are available for controlling the number of workers per layer.

Tracing a simpler example reveals that the underlying mechanics are a bit more complicated that one might immediately imagine:

Kino.Process.render_seq_trace(fn ->
  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  |> Flow.from_enumerable(stages: 1)
  |> Flow.map(fn value -> value end)
  |> Enum.into([])

Distributed Processing

The last step, obviously, is distributed processing. Unfortunately, I haven’t found an off-the-shelf solution for that. It is, however, often not necessary. And the tooling for building such an animal is growing.

I wouldn’t be surpriced to see something pop up within a year or two. Until then, tools outside the BEAM can be used.

Performance Comparison

At the lower right corner of each evaluated non-stale cell you will see a green circle. Hovering the mouse over it will activate a popup that tells you how long it took to evaluate it.

On a warm cache, I got the following times on my laptop (in ms):

data = [
  %{"impl" => "Eager", "time" => 57},
  %{"impl" => "Lazy", "time" => 58},
  %{"impl" => "Concurrent", "time" => 20}
alias VegaLite, as: Vl

Vl.new(width: 400, height: 160)
|> Vl.data_from_values(data)
|> Vl.mark(:bar)
|> Vl.encode_field(:x, "time", type: :quantitative, title: "Time / [ms]")
|> Vl.encode_field(:y, "impl", type: :nominal, title: "Implementation")

Note: There are considerable run-by-run variations.

Given the simplistic example, the takeaway here should be that eager and lazy are very comparable, and that concurrent can be a lot faster.

Ingestion-Style Problems

In ingestion-style problems there is a need for batching and supervision. For dealing with such problems there is Broadway. But that should be a different workbook …