Elixir Streams Playground
Section
Let’s define a couple of utility functions before starting
defmodule MemoryHelpers do
def check_process_memory(input, self_pid) do
IO.inspect(:erlang.process_info(self_pid, :memory), label: "erl process mem")
input
end
def check_erlang_memory(input) do
IO.inspect(:erlang.memory(:total), label: "erl mem")
input
end
end
defmodule MyFunc do
@moduledoc """
Just simple confusion :)
"""
@doc """
Simply add some nesting
"""
def manage_chunk(chunk) do
Enum.map(chunk, fn item ->
[item + 1, item + 2, item + 3, [item + 4, [item + 5, item + 6]]]
end)
end
end
:ok
Let’s see how our flow goes using plain Enum module
defmodule EnumTest do
def run(memory_checker) do
1..1_000_000
|> memory_checker.()
|> Enum.chunk_every(500)
|> memory_checker.()
|> Enum.flat_map(&MyFunc.manage_chunk/1)
|> memory_checker.()
end
end
EnumTest.run(&MemoryHelpers.check_erlang_memory/1)
:ok
Let’s see how this compare with Stream module now
defmodule StreamTest do
def run(memory_checker) do
1..1_000_000
|> memory_checker.()
|> Stream.chunk_every(500)
|> memory_checker.()
|> Stream.flat_map(&MyFunc.manage_chunk/1)
|> memory_checker.()
# We need one more step to "run" the stream
|> Enum.to_list()
|> memory_checker.()
end
end
StreamTest.run(&MemoryHelpers.check_erlang_memory/1)
:ok
Finally, let’s see more in detail some memory usage using Task. We run two different tasks in different times, so we will not have doubt about parallelism side effects (which shouldn’t be present in any case, since Erlang Processes’ memory is not shared)
IO.puts("Enum task")
task_enum =
Task.async(fn ->
pid = self()
EnumTest.run(&MemoryHelpers.check_process_memory(&1, pid))
end)
Task.await(task_enum)
IO.puts("Stream task")
task_stream =
Task.async(fn ->
pid = self()
StreamTest.run(&MemoryHelpers.check_process_memory(&1, pid))
end)
Task.await(task_stream)
:ok