Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Elixir Streams Playground

livebook.livemd

Elixir Streams Playground

Section

Let’s define a couple of utility functions before starting

defmodule MemoryHelpers do
  def check_process_memory(input, self_pid) do
    IO.inspect(:erlang.process_info(self_pid, :memory), label: "erl process mem")
    input
  end

  def check_erlang_memory(input) do
    IO.inspect(:erlang.memory(:total), label: "erl mem")
    input
  end
end

defmodule MyFunc do
  @moduledoc """
  Just simple confusion :)
  """

  @doc """
  Simply add some nesting
  """
  def manage_chunk(chunk) do
    Enum.map(chunk, fn item ->
      [item + 1, item + 2, item + 3, [item + 4, [item + 5, item + 6]]]
    end)
  end
end

:ok

Let’s see how our flow goes using plain Enum module

defmodule EnumTest do
  def run(memory_checker) do
    1..1_000_000
    |> memory_checker.()
    |> Enum.chunk_every(500)
    |> memory_checker.()
    |> Enum.flat_map(&MyFunc.manage_chunk/1)
    |> memory_checker.()
  end
end

EnumTest.run(&MemoryHelpers.check_erlang_memory/1)

:ok

Let’s see how this compare with Stream module now

defmodule StreamTest do
  def run(memory_checker) do
    1..1_000_000
    |> memory_checker.()
    |> Stream.chunk_every(500)
    |> memory_checker.()
    |> Stream.flat_map(&MyFunc.manage_chunk/1)
    |> memory_checker.()
    # We need one more step to "run" the stream
    |> Enum.to_list()
    |> memory_checker.()
  end
end

StreamTest.run(&MemoryHelpers.check_erlang_memory/1)

:ok

Finally, let’s see more in detail some memory usage using Task. We run two different tasks in different times, so we will not have doubt about parallelism side effects (which shouldn’t be present in any case, since Erlang Processes’ memory is not shared)

IO.puts("Enum task")

task_enum =
  Task.async(fn ->
    pid = self()

    EnumTest.run(&MemoryHelpers.check_process_memory(&1, pid))
  end)

Task.await(task_enum)

IO.puts("Stream task")

task_stream =
  Task.async(fn ->
    pid = self()

    StreamTest.run(&MemoryHelpers.check_process_memory(&1, pid))
  end)

Task.await(task_stream)
:ok