Powered by AppSignal & Oban Pro

Tasks for Async Computations

books/dataprocessing/02-task.livemd

Tasks for Async Computations

Mix.install([
  {:req, "~> 0.3.6"},
  {:kino_vega_lite, "~> 0.1.8"}
])

Section

defmodule Helpers do
  alias VegaLite, as: VL

  def instrument(fun) do
    {elapsed, value} = :timer.tc(fun)
    IO.puts("Execution took #{round(elapsed / 1000)}ms")
    value
  end

  def currency_conversion_url(source_currency, target_currency, date \\ "latest") do
    "https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/#{date}/currencies/#{source_currency}/#{target_currency}.json"
  end

  def plot_currencies(data) do
    values =
      Enum.map(data, fn {%Date{} = day, currency, value} when is_number(value) ->
        %{"day" => day, "currency" => currency, "value" => value}
      end)

    VL.new(width: 500)
    |> VL.mark(:line, point: true, tooltip: true)
    |> VL.encode_field(:x, "day", type: :temporal, time_unit: "yearmonthdate")
    |> VL.encode_field(:y, "value", type: :quantitative, title: "Value for €1")
    |> VL.encode_field(:color, "currency", type: :nominal, title: "Currency")
    |> VL.data_from_values(values)
    |> Kino.VegaLite.new()
  end
end
task =
  Task.async(fn ->
    IO.puts("👷‍♀️ Starting intensive task to generate a random number")
    Process.sleep(3000)
    Enum.random(1..100)
  end)

Process.sleep(1000)
IO.puts("😴 Oops, dozed off in the main process...")

random_number = Task.await(task)
IO.puts("💡 Here's the random number that the task returned: #{random_number}")

Computation Bound by Time

weird_task =
  Task.async(fn ->
    sleep_time = Enum.random(500..1500)
    IO.puts("💤 Sleeping for #{sleep_time}ms")
    Process.sleep(sleep_time)
    Enum.random([:foo, :bar, :baz])
  end)

case Task.yield(weird_task, _timeout = 1000) do
  nil ->
    case Task.shutdown(weird_task) do
      {:ok, result} -> result
      nil -> :timeout
    end

  {:ok, result} ->
    result
end

Parallel Mapping

infinite_stream = Stream.repeatedly(fn -> Enum.random(0..1000) end)

Enum.take(infinite_stream, 5)
parallel_stream =
  Task.async_stream(infinite_stream, fn timeout ->
    Process.sleep(timeout)
    "Slept for #{timeout} ms"
  end)

start_time = System.system_time(:millisecond)

parallel_stream
|> Stream.take(5)
|> Enum.each(fn result ->
  IO.puts("Task finished with result: #{inspect(result)}")
end)

IO.puts("Finished in #{System.system_time(:millisecond) - start_time}ms")
System.schedulers_online()
parallel_stream =
  Task.async_stream(
    infinite_stream,
    fn timeout ->
      Process.sleep(timeout)
      "Slept for #{timeout} ms"
    end,
    [raise("missing option")]
  )

start_time = System.system_time(:millisecond)

parallel_stream
|> Stream.take(5)
|> Enum.each(fn result ->
  IO.puts("Task finished with result: #{inspect(result)}")
end)

IO.puts("Finished in #{System.system_time(:millisecond) - start_time}ms")
parallel_stream =
  Task.async_stream(
    infinite_stream,
    fn timeout ->
      Process.sleep(timeout)
      "Slept for #{timeout} ms"
    end,
    [raise("missing option"), max_concurrency: 3]
  )

start_time = System.system_time(:millisecond)

parallel_stream
|> Stream.take(5)
|> Enum.each(fn result ->
  IO.puts("Task finished with result: #{inspect(result)}")
end)

IO.puts("Finished in #{System.system_time(:millisecond) - start_time}ms")

Section

currencies = ["mxn", "gbp", "usd", "czk", "ron", "pln"]
date_range = Date.range(Date.add(Date.utc_today(), -30), Date.utc_today())

# A stream of (potentially many) {date, currency} tuples.
stream = Stream.flat_map(currencies, fn currency -> Enum.map(date_range, &{&1, currency}) end)

Helpers.instrument(fn ->
  fun = fn {date, currency} ->
    response = Req.get!(Helpers.currency_conversion_url("eur", currency, date))
    {date, currency, Map.fetch!(response.body, currency)}
  end

  Task.async_stream(stream, fun, ordered: false, max_concurrency: 20)
  |> Stream.map(fn {:ok, result} -> result end)
  |> Helpers.plot_currencies()
end)