Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Flow

04-flow.livemd

Flow

Mix.install([:req, :flow])

What Is Flow?

Flow is an abstraction built on top of GenStage. Its job is processing collections asynchronously through series of stages.

> But that’s kind of what we did with Task.async_stream/1! — you, now

Indeed, the similarities are a few:

  • They both work on bounded or unbounded collections
  • They both process collection elements in parallel

However, Flow is more powerful, versatile, and customizable than Task.async_stream/1. Because of this, it’s also more complex and has a steeper learning curve, so make sure it solves your problem better than async_stream before using it!

Flow is built on top of GenStage itself. It’s the next layer of abstraction when thinking about parallel data processing and map/reduce algorithms.

Counting Words

Let’s implement the classic map/reduce toy example: counting the occurrence of every word in a text. We’ll fetch the words from the baconipsum.com API 🥓

The Enum Version

Let’s start with how we’d do this with Enum.

start_time = System.monotonic_time(:millisecond)

result =
  1..10
  |> Enum.map(fn _ ->
    Req.get!("https://baconipsum.com/api/?type=meat-and-filler&paras=50&format=text").body
  end)
  |> Enum.flat_map(fn paragraph ->
    String.split(paragraph, "\n", trim: true)
  end)
  |> Enum.flat_map(&String.split(&1, " ", trim: true))
  |> Enum.reduce(%{}, fn word, acc ->
    Map.update(acc, word, 1, &(&1 + 1))
  end)
  |> Enum.sort_by(fn {_word, count} -> count end, :desc)

end_time = System.monotonic_time(:millisecond)
IO.puts("Took us #{end_time - start_time}ms")

result

Cool. Cool cool cool. This solution works. However, it is sequential. It also loads the whole text into memory for every request and then it keeps eagerly evaluating expressions. Each of those can be expensive: first we split the whole text into a full list of lines, then each line into a full list of words, and so on. We fully compute each step before moving to the next.

I know what you’re thinking: this is what streams are for. You’re kind of right.

The Stream Version

It’s easy enough to change the Enum-based version to use streams instead.

start_time = System.monotonic_time(:millisecond)

result =
  1..10
  |> Stream.map(fn _ ->
    Req.get!("https://baconipsum.com/api/?type=meat-and-filler&paras=50&format=text").body
  end)
  |> Stream.flat_map(fn paragraph ->
    String.splitter(paragraph, "\n", trim: true)
  end)
  |> Stream.flat_map(&String.splitter(&1, " ", trim: true))
  |> Enum.reduce(%{}, fn word, acc ->
    Map.update(acc, word, 1, &(&1 + 1))
  end)
  |> Enum.sort_by(fn {_word, count} -> count end, :desc)

end_time = System.monotonic_time(:millisecond)
IO.puts("Took us #{end_time - start_time}ms")

result

The differences are:

  • We use String.splitter/3 instead of String.split/3, since it returns a lazy stream
  • We change Enum.flat_map/2 into Stream.flat_map/2

Just like that, our solution is now lazy and won’t compute every step and load it fully into memory. That’s fantastic, but our execution time is kind of exactly the same! Splitting strings and stuff like that can take memory, but it’s blazing-fast compared to performing HTTP requests.

Enter Flow

Flow tries its hardest to provide an API that’s almost identical to Enum and Stream. This is how we’d rewrite our example.

start_time = System.monotonic_time(:millisecond)

result =
  1..10
  |> Flow.from_enumerable(max_demand: 1, min_demand: 0)
  |> Flow.map(fn _ ->
    Req.get!("https://baconipsum.com/api/?type=meat-and-filler&paras=50&format=text").body
  end)
  |> Flow.flat_map(fn paragraph ->
    String.splitter(paragraph, "\n", trim: true)
  end)
  |> Flow.flat_map(&String.splitter(&1, " ", trim: true))
  |> Flow.partition()
  |> Flow.reduce(fn -> %{} end, fn word, acc ->
    Map.update(acc, word, 1, &(&1 + 1))
  end)
  |> Enum.sort_by(fn {_word, count} -> count end, :desc)

end_time = System.monotonic_time(:millisecond)
IO.puts("Took us #{end_time - start_time}ms")

result

Partitioning

The example above works thanks to a little function we snuck in there: Flow.partition/1.

Flow executes computations in different processes, including the Flow.reduce/3 step. This means that if we don’t have a way to make sure words are deterministically processed in specific processes, we’re not going to be able to perform the reduce step in parallel. This is a known issue when working with map/reduce: if you want the reduce step to happen in parallel, you need to divide the output of the map step in subsets that you can reduce on their own.

Flow.partition/1 does exactly that: it introduces a new set of stages and makes sure the same word is always mapped to the same stage. It does this through a hash function.

Windows and Triggers

We won’t go into too much detail on windows and triggers, but they’re a powerful feature of Flow.

The reason for having these features is unbounded collections. When working with unbounded collections, we can’t simply Flow.reduce/3 like we did in the word-counting example. We’ll never arrive at a reduceable result if our collection has infinite elements.

Windows and triggers address this problem. Windows let us split the collection based on time, and triggers let us tell the pipeline when to “flush” the results we have computed so far. Then, Flow can run its operations on windows of data instead of on the unbounded collection.

Once we specify a window strategy, then we can use triggers to materialize the data in the window. The parallel computation now happens on the window itself, not on all the data like in the examples above.

All events belong to the global window by default, which is returned by Flow.Window.global/0.

Flow.Window.global()

Triggers

Let’s see a trigger in action. We’ll use the Flow.Window.trigger_every/2 trigger, which triggers every n elements in the window.

window = Flow.Window.global() |> Flow.Window.trigger_every(10)

Flow.from_enumerable(1..100)
|> Flow.partition(window: window, stages: 1)
|> Flow.reduce(fn -> 0 end, &(&1 + &2))
|> Flow.emit(:state)
|> Enum.to_list()

As you can see in the output, every element in the returned list is the accumulated sum of the ten elements before it.

What to Emit?

You might notice a little call to Flow.emit/2 in the pipeline in our previous example. You can use Flow.emit/2 to tell Flow what is the value you want to emit to the next stage of the computation. Often, you’d use Flow.emit(flow, :events). In our case, what we want to emit to the next stage is the state calculated by Flow.reduce/3, so we use Flow.emit(:state).

Other Window Types

We can have different window types. The Flow.Window.count/1 window considers the given n elements. We can see how this plays out with the trigger_every trigger we used above. In the example below, we consider a window of 15 elements at a time, but emit the trigger every 6 elements.

window = Flow.Window.count(15) |> Flow.Window.trigger_every(6)

Stream.repeatedly(fn -> 5 end)
|> Flow.from_enumerable()
|> Flow.partition(window: window, stages: 1)
|> Flow.reduce(fn -> 0 end, &(&1 + &2))
|> Flow.emit(:state)
|> Enum.take(10)

Stages

As mentioned at the beginning of this livebook, Flow is built on top of GenStage. In many cases, you won’t even notice it. However, it’s important because it means all the GenStage features we know and love power Flow as well. For example, Flow pipelines have built-in backpressure through GenStage’s demand. You can also use GenStage producers or producer-consumers to feed a Flow pipeline through Flow.from_stages/2.

To control parallelism and batching in Flow, you’ll see GenStage concepts pop up. For example, the Flow.partition/2 function we used above uses a :stages option to control how many processes to use in the partitioning. You can also use :min_demand and :max_demand to control batching.