Data Flows
Mix.install([
{:briefly, "~> 0.5.1"},
{:httpoison, "~> 2.2"},
{:flow, "~> 1.2"},
{:vega_lite, "~> 0.1.9"},
{:kino_vega_lite, "~> 0.1.10"}
])
Introduction
In 2017, José Valim presented GenStage and Flow. The narrative of that presentation was really good, so this workbook will use that as a starting point. Stolen from José with pride.
This workbook makes use of the following modules:
- File for reading and writing files.
- Briefly for temporary files.
- HTTPoison for performing HTTP GET operations.
- Enum for eager processing of enumerables.
- Stream for lazy processing using streams.
- Flow for concurrent processing using GenStage processes.
In this workbook, we will explore different ways of counting the number of occurrences of each word present in a text. There are three principal steps in doing so:
- Read data from file.
- Segment it into words.
- Reduce the list of words into a map from word to count.
Case
This is “Alice’s Adventures in Wonderland” by Lewis Carroll:
url = "https://www.gutenberg.org/cache/epub/11/pg11.txt"
Lets download it:
{:ok, path} = Briefly.create()
%HTTPoison.Response{body: contents} = HTTPoison.get!(url)
File.write!(path, contents)
Eager Processing
eager =
File.read!(path)
|> String.split("\n")
|> Enum.flat_map(&String.split/1)
|> Enum.reduce(%{}, fn word, counts -> Map.update(counts, word, 1, &(&1 + 1)) end)
What’s going one here?
-
The
File.read!
line reads the contents of a file into memory or errors out. -
The
String.split
line does a pass over this and splits it into a list of strings (one per line). -
The
Enum.flat_map
line does another pass whereby it maps theString.split/1
function (which splits on whitespace) to each of these strings and flattens the result. The result is thus a list of strings representing each word in the original text. These words may contain periods, command and similar characters, but that is without relevance for this example. -
The
Enum.reduce
line does yet another apss where it reduces this list to a single map. This map starts empty, and for each encountered word this is either incremented (if already present) or inserted with a value of 1.
While this solution is fine for a small file, it has a number of downsides, namely:
- Memory consumption: It loads the entire file into memory. That may be fine for a 10kB file, but wouldf likely be problematic for a 10GB file.
- Memory access: It does four passes over the data of the file; first for reading, then for splitting into lines, then for splitting into words and finally for reducing into the map.
This is the result of eager processing.
Lazy Processing
lazy =
File.stream!(path)
|> Stream.flat_map(&String.split/1)
|> Enum.reduce(%{}, fn word, counts -> Map.update(counts, word, 1, &(&1 + 1)) end)
What’s going on here?
-
The
File.stream!
line works likeFile.read!
except that instead of returning the contents of the file it returns a recipe for reading that contents. -
The
Stream.flat_map
line works likeEnum.flat_map
except that:- Instead of taking an enumerable as input it takes a recipe.
- Instead of returning a list of strings it returns a recipe for reading a sequence of strigs.
-
The
Enum.reduce
line is the same as the one from the eager version. Because it is eager, it will follow the recipe that it takes as first argument.
Enum.reduce
will consume values (i.e., words), one at a time, produced by the recipe passed to it. In order to produce a value the recipe will follow the recipe passed to it. This way, recipes are chained, and the call follows this chain for each value produced.
Let’s try to illustrate this:
print = fn value ->
IO.puts(value)
value
end
[1, 2, 3]
|> Stream.map(print)
|> Stream.map(print)
|> Enum.to_list()
We can also inspect the recipe itself:
File.stream!(path)
Comparing this to the eager solution, we see that:
- At no point do we load any significant amount of data into memory.
- This comes at a bit of overhead, but this is robust to large files.
- The lazy solution could deal with infinite streams (e.g., a random number generator).
- We are still unable to utilize multiple cores.
Concurrent Processing
concurrent =
File.stream!(path)
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split/1)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, counts -> Map.update(counts, word, 1, &(&1 + 1)) end)
|> Enum.into(%{})
What’s going on here?
-
The
File.stream!
line is exactly the same as in the lazy version. -
The
Flow.from_enumerate
line freates a new flow from an enumerable. It will consume batches of elements from the previous line when it receives demand.File.stream!
, by default, will send it one element per line it encounters. -
The
Flow.flat_map
line is exactly the same as in the lazy version, only it produces a flow that can be evaluated concurrently. -
The
Flow.partition
line will partition the data according to a keying function whose keys will be hashed to the number of stages in the next step of the flow. Partitioning allows you to control which classes of inputs are routed to the same stages of the next step. This is critical for some problems and a waste of resources for others. -
The
Flow.reduce
line is exactly the same as in the lazy vesion, only it produces a flow and, given that it is positioned after the partioning, specific words will alway be mapped to a specific stage of those that make up this step of the flow. This means that we are guaranteed to not have any key that appears in two stages. -
The
Enum.into
line eagerly constructs a map from the flow. This is what drives the demand.
Lets take a look at the recipe part of this (before it is consumed by Enum.into
):
concurrent_recipe =
File.stream!(path)
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split/1)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, counts -> Map.update(counts, word, 1, &(&1 + 1)) end)
That structure can be used to vizualize the flow of data (left to right):
defmodule RecipeViz do
@stepsize 300
@y_offset 30
@y_spacing 40
def analyze(%File.Stream{}) do
[%{name: "File Stream", count: 1}]
end
def analyze(%Flow{} = flow) do
%Flow{
operations: [operation],
options: [stages: stagecount],
producers: {producer, [input]},
window: _window
} = flow
node_name =
case operation do
{a, b, _} when is_atom(a) and is_atom(b) ->
"#{producer} #{a} #{b}"
{a, _, _} when is_atom(a) ->
"#{producer} #{a}"
end
tail = analyze(input)
[%{name: node_name, count: stagecount} | tail]
end
def enrich(config) do
maxcount = config |> Enum.map(fn %{count: count} -> count end) |> Enum.max()
height = maxcount * @y_spacing
new_config =
config
|> Enum.with_index(fn %{count: count} = entry, i ->
points =
1..count
|> Enum.map(fn count_i ->
{
(i + 0.5) * @stepsize,
@y_offset + (0.5 + count_i) * height / (count + 2)
}
end)
Map.merge(entry, %{points: points})
end)
{new_config, height}
end
def render(recipe) do
{config, height} =
recipe
|> analyze()
|> Enum.reverse()
|> enrich()
lines_labels =
config
|> Enum.with_index(fn %{name: name}, i ->
"""
#{name}
"""
end)
lines_nodes =
config
|> Enum.map(fn %{points: points} ->
points
|> Enum.map(fn {x, y} ->
"""
"""
end)
end)
|> List.flatten()
lines_edges =
config
|> Enum.chunk_every(2, 1, :discard)
|> Enum.map(fn [%{points: src_points}, %{points: dst_points}] ->
for {src_x, src_y} <- src_points, {dst_x, dst_y} <- dst_points do
"""
"""
end
end)
|> List.flatten()
xmin = 0
xmax = length(config) * @stepsize
ymin = 0
ymax = @y_offset + height
"""
#{lines_labels |> Enum.map(fn line -> " #{line}\n" end) |> Enum.join()}
#{lines_edges |> Enum.map(fn line -> " #{line}\n" end) |> Enum.join()}
#{lines_nodes |> Enum.map(fn line -> " #{line}\n" end) |> Enum.join()}
"""
|> Kino.Image.new(:svg)
end
end
RecipeViz.render(concurrent_recipe)
Note: Options are available for controlling the number of workers per layer.
Tracing a simpler example reveals that the underlying mechanics are a bit more complicated that one might immediately imagine:
Kino.Process.render_seq_trace(fn ->
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|> Flow.from_enumerable(stages: 1)
|> Flow.map(fn value -> value end)
|> Enum.into([])
end)
Distributed Processing
The last step, obviously, is distributed processing. Unfortunately, I haven’t found an off-the-shelf solution for that. It is, however, often not necessary. And the tooling for building such an animal is growing.
I wouldn’t be surpriced to see something pop up within a year or two. Until then, tools outside the BEAM can be used.
Performance Comparison
At the lower right corner of each evaluated non-stale cell you will see a green circle. Hovering the mouse over it will activate a popup that tells you how long it took to evaluate it.
On a warm cache, I got the following times on my laptop (in ms):
data = [
%{"impl" => "Eager", "time" => 57},
%{"impl" => "Lazy", "time" => 58},
%{"impl" => "Concurrent", "time" => 20}
]
alias VegaLite, as: Vl
Vl.new(width: 400, height: 160)
|> Vl.data_from_values(data)
|> Vl.mark(:bar)
|> Vl.encode_field(:x, "time", type: :quantitative, title: "Time / [ms]")
|> Vl.encode_field(:y, "impl", type: :nominal, title: "Implementation")
Note: There are considerable run-by-run variations.
Given the simplistic example, the takeaway here should be that eager and lazy are very comparable, and that concurrent can be a lot faster.
Ingestion-Style Problems
In ingestion-style problems there is a need for batching and supervision. For dealing with such problems there is Broadway. But that should be a different workbook …