Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Task.async_stream/3

chapters/ch_7.3_task_async_stream.livemd

Task.async_stream/3

Mix.install([
  {:httpoison, "~> 2.1"},
  {:jason, "1.4.0"},
  {:nimble_csv, "~> 1.2"}
])

Navigation

Home Awaiting tasksSupervised tasks

Introduction

Streams are a valuable feature in Elixir that allow for lazy emission of elements. Any enumerable that generates elements one by one during enumeration is considered a stream. Streams are particularly useful when dealing with large datasets that could consume excessive memory if loaded all at once. With streams, we can manage data lazily, processing elements as needed. To learn more about Stream module check out the documentation here.

Now that we have a basic understanding of streams, let’s explore the exciting Task.async_stream/3 function. This function enables concurrent processing of each element in a enumerable, unlocking significant potential for parallel execution.

Since Task.async_stream/3 works on enumerables it can work on both Streams and Enums.

Lets look at an example…

# A function to get a chuk norris joke by calling an api
get_chuknorris_joke = fn ->
  HTTPoison.get!("https://api.chucknorris.io/jokes/random")
  |> Map.get(:body)
  |> Jason.decode!()
  |> Map.get("value")
end

Lets see how much time it takes to make 10 api calls one by one

Enum.map(1..10, fn _ -> get_chuknorris_joke.() end)

Now lets try the same using Task.async_stream/3

1..10
|> Task.async_stream(fn _ -> get_chuknorris_joke.() end)
|> Enum.to_list()

Observe the significant improvement in the function’s execution speed this time. This is because Task.async_stream/3 launched a separate process to handle each item in the enumerable. In our case, it spawned a separate process for each API call.

It’s important to note that Task.async_stream/3 returns a stream, which is lazy and won’t execute until we consume it. A common way to consume a stream is by using one of the Enum functions, such as Enum.to_list/1 in this case, or by invoking Stream.run/1.

Task.async_stream/3 also provides various options to customize its behavior. One such option is :max_concurrency, which allows us to control the number of tasks running simultaneously. By default, it is set to System.schedulers_online/0.

Another consideration is the ordering of results from Task.async_stream/3. By default, Elixir buffers the results to emit them in the original order, as the spawned processes may finish in random order. However, setting the :ordered option to false removes the need for buffering at the expense of removing ordering.

For a complete list of options, refer to the documentation.

A practical example

Now, let’s explore another practical example of using Task.async_stream/3. In this example, we’ll read a CSV file containing the top 100 websites.

The csv file has data in the following format…

1,"fonts.googleapis.com",10
2,"facebook.com",10
3,"twitter.com",10
4,"google.com",10
5,"youtube.com",10
...

Our goal is to check the reachability of each website by sending an HTTP request to it.

"#{Path.absname(__DIR__)}/sample_data/top_websites.csv"
|> File.stream!()
|> NimbleCSV.RFC4180.parse_stream()
# Map out the website information from every row in the csv file
|> Stream.map(fn [_, website, _] -> website end)
|> Task.async_stream(&HTTPoison.get/1, timeout: :infinity, ordered: false, max_concurrency: 4)
# Filter out reachable websites
|> Stream.filter(fn
  {:ok, _} -> true
  _ -> false
end)
|> Enum.count()
|> IO.inspect(label: "Reachable website count")

Here’s a breakdown of the code:

First, we use File.stream!/1 to read the CSV file. This function provides a stream that allows us to access the file lazily, avoiding the need to load the entire file into memory.

Next, we parse the file using the parse_stream/1 function from the nimble_csv library. This gives us a parsed stream of the CSV data.

We then leverage Task.async_stream/3 to make a GET request to each website concurrently. Since the order of the responses doesn’t matter, we specify ordered: false. Additionally, we limit the concurrency to 4 requests at a time using the max_concurrency: 4 option.

Finally, we filter out the reachable websites and then count the elements by consuming the stream using the Enum.count/1 function.

By using Task.async_stream/3 Elixir enables us to perform concurrent data processing with just a few lines of code. This simplicity and power of concurrent programming in Elixir is truly amazing 🚀

References

Navigation

Home Awaiting tasksSupervised tasks