Tasks for Async Computations
Mix.install([
{:req, "~> 0.3.6"},
{:kino_vega_lite, "~> 0.1.8"}
])
Section
defmodule Helpers do
alias VegaLite, as: VL
def instrument(fun) do
{elapsed, value} = :timer.tc(fun)
IO.puts("Execution took #{round(elapsed / 1000)}ms")
value
end
def currency_conversion_url(source_currency, target_currency, date \\ "latest") do
"https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/#{date}/currencies/#{source_currency}/#{target_currency}.json"
end
def plot_currencies(data) do
values =
Enum.map(data, fn {%Date{} = day, currency, value} when is_number(value) ->
%{"day" => day, "currency" => currency, "value" => value}
end)
VL.new(width: 500)
|> VL.mark(:line, point: true, tooltip: true)
|> VL.encode_field(:x, "day", type: :temporal, time_unit: "yearmonthdate")
|> VL.encode_field(:y, "value", type: :quantitative, title: "Value for €1")
|> VL.encode_field(:color, "currency", type: :nominal, title: "Currency")
|> VL.data_from_values(values)
|> Kino.VegaLite.new()
end
end
task =
Task.async(fn ->
IO.puts("👷♀️ Starting intensive task to generate a random number")
Process.sleep(3000)
Enum.random(1..100)
end)
Process.sleep(1000)
IO.puts("😴 Oops, dozed off in the main process...")
random_number = Task.await(task)
IO.puts("💡 Here's the random number that the task returned: #{random_number}")
Computation Bound by Time
weird_task =
Task.async(fn ->
sleep_time = Enum.random(500..1500)
IO.puts("💤 Sleeping for #{sleep_time}ms")
Process.sleep(sleep_time)
Enum.random([:foo, :bar, :baz])
end)
case Task.yield(weird_task, _timeout = 1000) do
nil ->
case Task.shutdown(weird_task) do
{:ok, result} -> result
nil -> :timeout
end
{:ok, result} ->
result
end
Parallel Mapping
infinite_stream = Stream.repeatedly(fn -> Enum.random(0..1000) end)
Enum.take(infinite_stream, 5)
parallel_stream =
Task.async_stream(infinite_stream, fn timeout ->
Process.sleep(timeout)
"Slept for #{timeout} ms"
end)
start_time = System.system_time(:millisecond)
parallel_stream
|> Stream.take(5)
|> Enum.each(fn result ->
IO.puts("Task finished with result: #{inspect(result)}")
end)
IO.puts("Finished in #{System.system_time(:millisecond) - start_time}ms")
System.schedulers_online()
parallel_stream =
Task.async_stream(
infinite_stream,
fn timeout ->
Process.sleep(timeout)
"Slept for #{timeout} ms"
end,
[raise("missing option")]
)
start_time = System.system_time(:millisecond)
parallel_stream
|> Stream.take(5)
|> Enum.each(fn result ->
IO.puts("Task finished with result: #{inspect(result)}")
end)
IO.puts("Finished in #{System.system_time(:millisecond) - start_time}ms")
parallel_stream =
Task.async_stream(
infinite_stream,
fn timeout ->
Process.sleep(timeout)
"Slept for #{timeout} ms"
end,
[raise("missing option"), max_concurrency: 3]
)
start_time = System.system_time(:millisecond)
parallel_stream
|> Stream.take(5)
|> Enum.each(fn result ->
IO.puts("Task finished with result: #{inspect(result)}")
end)
IO.puts("Finished in #{System.system_time(:millisecond) - start_time}ms")
Section
currencies = ["mxn", "gbp", "usd", "czk", "ron", "pln"]
date_range = Date.range(Date.add(Date.utc_today(), -30), Date.utc_today())
# A stream of (potentially many) {date, currency} tuples.
stream = Stream.flat_map(currencies, fn currency -> Enum.map(date_range, &{&1, currency}) end)
Helpers.instrument(fn ->
fun = fn {date, currency} ->
response = Req.get!(Helpers.currency_conversion_url("eur", currency, date))
{date, currency, Map.fetch!(response.body, currency)}
end
Task.async_stream(stream, fun, ordered: false, max_concurrency: 20)
|> Stream.map(fn {:ok, result} -> result end)
|> Helpers.plot_currencies()
end)