Flow
Mix.install([:req, :flow])
What Is Flow?
Flow is an abstraction built on top of GenStage. Its job is processing collections asynchronously through series of stages.
> But that’s kind of what we did with Task.async_stream/1
! — you, now
Indeed, the similarities are a few:
- They both work on bounded or unbounded collections
- They both process collection elements in parallel
However, Flow is more powerful, versatile, and customizable than Task.async_stream/1
. Because of this, it’s also more complex and has a steeper learning curve, so make sure it solves your problem better than async_stream
before using it!
Flow is built on top of GenStage itself. It’s the next layer of abstraction when thinking about parallel data processing and map/reduce algorithms.
Counting Words
Let’s implement the classic map/reduce toy example: counting the occurrence of every word in a text. We’ll fetch the words from the baconipsum.com API 🥓
The Enum
Version
Let’s start with how we’d do this with Enum
.
start_time = System.monotonic_time(:millisecond)
result =
1..10
|> Enum.map(fn _ ->
Req.get!("https://baconipsum.com/api/?type=meat-and-filler¶s=50&format=text").body
end)
|> Enum.flat_map(fn paragraph ->
String.split(paragraph, "\n", trim: true)
end)
|> Enum.flat_map(&String.split(&1, " ", trim: true))
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, &(&1 + 1))
end)
|> Enum.sort_by(fn {_word, count} -> count end, :desc)
end_time = System.monotonic_time(:millisecond)
IO.puts("Took us #{end_time - start_time}ms")
result
Cool. Cool cool cool. This solution works. However, it is sequential. It also loads the whole text into memory for every request and then it keeps eagerly evaluating expressions. Each of those can be expensive: first we split the whole text into a full list of lines, then each line into a full list of words, and so on. We fully compute each step before moving to the next.
I know what you’re thinking: this is what streams are for. You’re kind of right.
The Stream
Version
It’s easy enough to change the Enum
-based version to use streams instead.
start_time = System.monotonic_time(:millisecond)
result =
1..10
|> Stream.map(fn _ ->
Req.get!("https://baconipsum.com/api/?type=meat-and-filler¶s=50&format=text").body
end)
|> Stream.flat_map(fn paragraph ->
String.splitter(paragraph, "\n", trim: true)
end)
|> Stream.flat_map(&String.splitter(&1, " ", trim: true))
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, &(&1 + 1))
end)
|> Enum.sort_by(fn {_word, count} -> count end, :desc)
end_time = System.monotonic_time(:millisecond)
IO.puts("Took us #{end_time - start_time}ms")
result
The differences are:
-
We use
String.splitter/3
instead ofString.split/3
, since it returns a lazy stream -
We change
Enum.flat_map/2
intoStream.flat_map/2
Just like that, our solution is now lazy and won’t compute every step and load it fully into memory. That’s fantastic, but our execution time is kind of exactly the same! Splitting strings and stuff like that can take memory, but it’s blazing-fast compared to performing HTTP requests.
Enter Flow
Flow tries its hardest to provide an API that’s almost identical to Enum
and Stream
. This is how we’d rewrite our example.
start_time = System.monotonic_time(:millisecond)
result =
1..10
|> Flow.from_enumerable(max_demand: 1, min_demand: 0)
|> Flow.map(fn _ ->
Req.get!("https://baconipsum.com/api/?type=meat-and-filler¶s=50&format=text").body
end)
|> Flow.flat_map(fn paragraph ->
String.splitter(paragraph, "\n", trim: true)
end)
|> Flow.flat_map(&String.splitter(&1, " ", trim: true))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, &(&1 + 1))
end)
|> Enum.sort_by(fn {_word, count} -> count end, :desc)
end_time = System.monotonic_time(:millisecond)
IO.puts("Took us #{end_time - start_time}ms")
result
Partitioning
The example above works thanks to a little function we snuck in there: Flow.partition/1
.
Flow executes computations in different processes, including the Flow.reduce/3
step. This means that if we don’t have a way to make sure words are deterministically processed in specific processes, we’re not going to be able to perform the reduce step in parallel. This is a known issue when working with map/reduce: if you want the reduce step to happen in parallel, you need to divide the output of the map step in subsets that you can reduce on their own.
Flow.partition/1
does exactly that: it introduces a new set of stages and makes sure the same word is always mapped to the same stage. It does this through a hash function.
Windows and Triggers
We won’t go into too much detail on windows and triggers, but they’re a powerful feature of Flow.
The reason for having these features is unbounded collections. When working with unbounded collections, we can’t simply Flow.reduce/3
like we did in the word-counting example. We’ll never arrive at a reduceable result if our collection has infinite elements.
Windows and triggers address this problem. Windows let us split the collection based on time, and triggers let us tell the pipeline when to “flush” the results we have computed so far. Then, Flow can run its operations on windows of data instead of on the unbounded collection.
Once we specify a window strategy, then we can use triggers to materialize the data in the window. The parallel computation now happens on the window itself, not on all the data like in the examples above.
All events belong to the global window by default, which is returned by Flow.Window.global/0
.
Flow.Window.global()
Triggers
Let’s see a trigger in action. We’ll use the Flow.Window.trigger_every/2
trigger, which triggers every n
elements in the window.
window = Flow.Window.global() |> Flow.Window.trigger_every(10)
Flow.from_enumerable(1..100)
|> Flow.partition(window: window, stages: 1)
|> Flow.reduce(fn -> 0 end, &(&1 + &2))
|> Flow.emit(:state)
|> Enum.to_list()
As you can see in the output, every element in the returned list is the accumulated sum of the ten elements before it.
What to Emit?
You might notice a little call to Flow.emit/2
in the pipeline in our previous example. You can use Flow.emit/2
to tell Flow what is the value you want to emit to the next stage of the computation. Often, you’d use Flow.emit(flow, :events)
. In our case, what we want to emit to the next stage is the state calculated by Flow.reduce/3
, so we use Flow.emit(:state)
.
Other Window Types
We can have different window types. The Flow.Window.count/1
window considers the given n
elements. We can see how this plays out with the trigger_every
trigger we used above. In the example below, we consider a window of 15 elements at a time, but emit the trigger every 6 elements.
window = Flow.Window.count(15) |> Flow.Window.trigger_every(6)
Stream.repeatedly(fn -> 5 end)
|> Flow.from_enumerable()
|> Flow.partition(window: window, stages: 1)
|> Flow.reduce(fn -> 0 end, &(&1 + &2))
|> Flow.emit(:state)
|> Enum.take(10)
Stages
As mentioned at the beginning of this livebook, Flow is built on top of GenStage. In many cases, you won’t even notice it. However, it’s important because it means all the GenStage features we know and love power Flow as well. For example, Flow pipelines have built-in backpressure through GenStage’s demand. You can also use GenStage producers or producer-consumers to feed a Flow pipeline through Flow.from_stages/2
.
To control parallelism and batching in Flow, you’ll see GenStage concepts pop up. For example, the Flow.partition/2
function we used above uses a :stages
option to control how many processes to use in the partitioning. You can also use :min_demand
and :max_demand
to control batching.