Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Project - Building a Download Manager ๐Ÿš€

ch_6.0_project_building_a_download_manager.livemd

Project - Building a Download Manager ๐Ÿš€

Mix.install([
  {:kino, "~> 0.9.0"},
  {:elixir_uuid, "~> 1.2"},
  {:httpoison, "~> 2.1"},
  {:sizeable, "~> 1.0"}
])

Navigation

Home Scaling dynamic supervisorIntroduction to tasks

Introduction

In this chapter, we will put into practice the concepts we have learned in the previous chapters by building a project. Our project will be a simple download manager that has the capability to download multiple files simultaneously and provide status updates for each download. A key requirement is that the failure of one download should not impact the progress or completion of other ongoing downloads.

By building this download manager, we will explore the use of supervisors and dynamic supervisors to handle the concurrent downloading of files, ensuring fault tolerance and isolation between download processes. We will also dive into message passing and state management to track the progress and status of each download.

The Download struct

We represent each download using a struct that contains the following fields:

  • id: A unique identifier for the download.
  • name: The name of the download.
  • src: The source URL from where the download is initiated.
  • dest: The destination file path where the downloaded file will be saved.
  • from: The process ID (PID) of the requester process who initiated the download.
  • pid: The process ID (PID) of the worker process thatโ€™s downloading the file.
  • status: The current status of the download.
  • size: The size of the download in bytes.
  • bytes_downloaded: The number of bytes downloaded for the download.
  • exit_status: The exit status of the downloading process, if applicable.
  • error_status: The reason for a failed download, if applicable.
  • start_time: The timestamp when the download started.
  • end_time: The timestamp when the download finished.
  • resp: The response received from the source URL when downloading the file.
  • fd: The file descriptor of the downloaded file on disk.

These fields provide essential information to track and manage the progress, status, and details of each download within our download manager. By utilizing this struct, we can effectively handle multiple concurrent downloads, monitor their progress, and report their status accurately.

defmodule Download do
  @enforce_keys [:id, :src, :dest, :from]
  defstruct [
    :id,
    :name,
    :src,
    :dest,
    :from,
    :pid,
    :size,
    :status,
    :bytes_downloaded,
    :exit_status,
    :error_reason,
    :start_time,
    :end_time,
    :resp,
    :fd
  ]
end

Architecture

Lets look at the high level architecture of our application.

Our application will consist of 3 important modules.

The high-level tasks of each module are as follows:

  • DownloadManager: The DownloadManager module is a GenServer that stores and tracks the state of all download worker processes. It provides APIs to add, remove, and retrieve download information. It also periodically updates the status of downloads and handles termination messages from download workers.

  • DownloadsSupervisor: The DownloadsSupervisor module is a dynamic supervisor that allows the dynamic spawning of DownloadWorker processes. It starts and supervises individual DownloadWorker processes to handle each download. It provides functions to start and terminate child processes.

  • DownloadWorker: The DownloadWorker module is a GenServer that is responsible for performing the actual download task. It runs as an individual process and handles downloading chunks of data from a given source URL. It communicates with the DownloadManager module to update the download status and handle completion or failure of the download.

Overall, the DownloadManager orchestrates the download process, the DownloadsSupervisor manages the lifecycle of DownloadWorker processes, and the DownloadWorker handles the actual downloading task.

Now lets go through the implementation of each of these modules in detail.

The Download Worker

The DownloadWorker module is a GenServer responsible for handling individual file downloads. Each download is performed by its own process, represented by an instance of the DownloadWorker GenServer.

For downloading the file we utilize the httpoison library and its async download feature, which allows us to stream chunks of the downloaded file.

This happens using the HTTPoison.get(src, %{}, stream_to: self(), async: :once) function call. The stream_to: self() option enables the DownloadWorker process to receive each downloaded chunk via the handle_info/2 callback.

The async: :once option ensures that only one chunk at a time is sent to our DownloadWorker process. Once we have processed a chunk, we can request the next chunk by calling HTTPoison.stream_next/1.

The lifecycle of the DownloadWorker process is as follows:

  • init/1: In the init callback, we prepare the download struct, initialize the process, and kick off the download by returning {:ok, new_download, {:continue, :kickoff}}. This ensures that the handle_continue/2 callback will be called immediately after the init/1 callback.

  • handle_continue/2: After the initialization, in the handle_continue callback, we open the file where the download will be saved and initiate the download using HTTPoison.get/3. We save the file descriptor in the download struct as the GenServer state. If any errors occur during the download, we update the download status accordingly and return {:stop, reason, failed_download} to stop the GenServer with the appropriate failure reason.

  • handle_info/2: This callback is responsible for handling various events and actions during the download process. Here are the different scenarios:

    • handle_info(%HTTPoison.AsyncHeaders{headers: headers}, download): Invoked when the download begins, providing headers containing metadata about the download, such as the file size. We save this information in the download struct as the GenServer state and request the first chunk using HTTPoison.stream_next/1.

    • handle_info(%HTTPoison.AsyncStatus{}, download): Requests the next chunk of the download by calling HTTPoison.stream_next/1.

    • handle_info(%HTTPoison.AsyncChunk{chunk: chunk}, download): Saves a received chunk by appending it to the file on disk. We then request the next chunk using HTTPoison.stream_next/1.

    • handle_info(%HTTPoison.AsyncEnd{}, download): Triggered when the download is completed. We close the file descriptor, update the download status, and return {:stop, :finish, finished_download} to gracefully terminate the download worker process.

    • The remaining handle_info/2 callbacks handle errors that occur during the download process, such as status code errors or HTTPoison errors. In such cases, we update the download status accordingly and terminate the download worker process by returning {:stop, message, failed_download}.

  • handle_call(:status, _from, download): This callback handles the :status call and simply returns the download struct, which represents the current state of the download. It can be used to inform the caller about the download status.

  • terminate/2: This callback is invoked when the download worker process is about to exit. By enabling the :trap_exit flag in the init/1 callback, we can trap exits and perform cleanup operations if the download process stops either due to failures or when a download finishes successfully. In this callback we inform the caller process (identified by the :from property in the download struct) by sending a {:terminating, id, download} message. We then close the file descriptor and gracefully shut down the process.

Lastly, note that the DownloadWorker GenServer has the restart: :temporary option set, which means that any failed process wonโ€™t be automatically restarted by the supervisor. In our case this is the expected behaviour since we already handle failed downloads and donโ€™t want to retry them.

defmodule DownloadWorker do
  @moduledoc """
  Worker Genserver for downloading a file
  """
  use GenServer, restart: :temporary
  alias Download
  require Logger

  def start_link(args, opts \\ []), do: GenServer.start_link(__MODULE__, args, opts)

  # Callbacks

  @impl GenServer
  def init(%Download{} = download) do
    Logger.info("Start new download worker: #{download.id}")

    # Makes your process call terminate/2 upon exit.
    Process.flag(:trap_exit, true)

    # Prepare the new download struct
    new_download = %Download{
      id: download.id,
      src: download.src,
      dest: download.dest,
      from: download.from,
      size: 0,
      status: :initiate,
      bytes_downloaded: 0,
      exit_status: nil,
      error_reason: nil,
      start_time: nil,
      end_time: nil,
      resp: nil,
      fd: nil
    }

    # Return the download struct as the GenServer state
    # Also return {:continue, :kickoff} to immediately execute the handle_continue/2
    # callback to kickoff the download
    {:ok, new_download, {:continue, :kickoff}}
  end

  @impl GenServer
  def handle_continue(:kickoff, %Download{src: src, dest: dest} = download) do
    # Open up a file to save the download, this returns a file descriptor
    {:ok, fd} = File.open(dest, [:write, :binary])

    # Kick off the download
    case HTTPoison.get(src, %{}, stream_to: self(), async: :once) do
      {:ok, resp} ->
        download = %Download{download | resp: resp, fd: fd}
        {:noreply, download}

      # In case of errors update the download struct and return
      # {:stop, reason, state} to stop the GenServer process
      {:error, %HTTPoison.Error{reason: reason}} ->
        failed_download = %Download{
          download
          | status: :error,
            error_reason: reason,
            exit_status: :error
        }

        {:stop, reason, failed_download}
    end
  end

  @impl GenServer
  def terminate(reason, %Download{id: id, from: from} = download) do
    # Inform parent process about download finish or failure.
    # The parent process pid is available in the `:from` property of the download struct
    Process.send(from, {:terminating, id, download}, [])

    Logger.info(
      "Terminate download-worker #{id}: reason=#{inspect(reason)} download=#{inspect(download)}"
    )

    # Close the file descriptor
    download
    |> Map.get(download, :fd)
    |> File.close()

    # Gracefully stop the GenServer process
    :normal
  end

  @impl GenServer
  def handle_call(:status, _from, download), do: {:reply, download, download}

  @impl GenServer
  def handle_info(%HTTPoison.AsyncStatus{code: code}, download) when code >= 400 do
    message = "Failed with code: #{code}"

    failed_download = %Download{
      download
      | status: :error,
        error_reason: message,
        exit_status: :error
    }

    {:stop, message, failed_download}
  end

  @impl GenServer
  def handle_info(%HTTPoison.AsyncStatus{}, download) do
    HTTPoison.stream_next(download.resp)
    {:noreply, download}
  end

  @impl GenServer
  def handle_info(%HTTPoison.Error{reason: reason}, download) do
    message = inspect(reason)

    failed_download = %Download{
      download
      | status: :error,
        error_reason: message,
        exit_status: :error
    }

    {:stop, message, failed_download}
  end

  @impl GenServer
  def handle_info(%HTTPoison.AsyncHeaders{headers: headers}, download) do
    # Get the "Content-Length" header from the list of headers returned in the response
    content_length_header =
      Enum.find(headers, fn
        {"Content-Length", _length} -> true
        _ -> false
      end)

    # Get the download file size from the "Content-Length" header
    size =
      case content_length_header do
        {"Content-Length", length} -> length || 0
        nil -> 0
      end

    # Ask for the next chunk of download
    HTTPoison.stream_next(download.resp)

    # Save the download size and other meta data in the download struct
    download = %Download{download | size: size, status: :active, start_time: DateTime.utc_now()}
    {:noreply, download}
  end

  @impl GenServer
  def handle_info(%HTTPoison.AsyncChunk{chunk: chunk}, download) do
    # Append the new chunk of data in the file using the file descriptor
    IO.binwrite(download.fd, chunk)
    HTTPoison.stream_next(download.resp)

    # Update the bytes downloaded information in the download struct
    bytes_downloaded = download.bytes_downloaded + byte_size(chunk)
    download = %Download{download | bytes_downloaded: bytes_downloaded}
    {:noreply, download}
  end

  @impl GenServer
  def handle_info(%HTTPoison.AsyncEnd{}, download) do
    File.close(download.fd)

    finished_download = %Download{
      download
      | status: :finish,
        exit_status: :normal,
        end_time: DateTime.utc_now()
    }

    # Since the download is finished we are returning {:stop, .., ..}
    # which will invoke the `terminate/2` callback
    {:stop, :finish, finished_download}
  end
end

The DownloadSupervisor

The DownloadsSupervisor module serves as a dynamic supervisor, enabling us to dynamically spawn and manage DownloadWorker processes. Additionally, it provides the ability to stop any active download process as needed.

By utilizing a dynamic supervisor, we gain visibility into the active downloads by examining the child processes maintained under this supervisor. This allows us to easily track and manage the ongoing download operations.

defmodule DownloadsSupervisor do
  use DynamicSupervisor
  require Logger

  # Public functions the interact with the supervisor

  @doc """
  Start the supervisor process
  """
  def start_link(_) do
    DynamicSupervisor.start_link(__MODULE__, :no_args, name: __MODULE__)
  end

  @doc """
  Start a new download under the supervisor
  """
  def add(args) do
    {:ok, pid} = DynamicSupervisor.start_child(__MODULE__, {DownloadWorker, args})

    pid
  end

  @doc """
  Stop an existing download process under the supervisor
  """
  def remove(child_pid) do
    DynamicSupervisor.terminate_child(__MODULE__, child_pid)
  end

  # Callbacks

  @impl true
  def init(:no_args) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

The DownloadManager

The DownloadManager module serves as a GenServer module that aggregates and manages the state of all active, completed and failed downloads. It offers a convenient API to interact with the underlying DownloadsSupervisor and DownloadWorker processes.

The DownloadManager module has public functions to perform common tasks. By exposing these functions, we can interact with the DownloadManager without directly making calls to the GenServer module.

The public API functions provide a clean and concise interface for managing downloads, while the internal logic handles the complexities of interacting with the supervisor and worker processes.

To provide more context and important details:

  • The DownloadManager maintains a map of all downloads in its state, and keeps this map updated with the lastest download statuses by querying the download workers periodically.

  • In the init/1 callback we schedule periodic updates to refresh the state of active downloads by sending a :fetch_all message to itself.

  • The DownloadManager allows adding a new download, removing an existing download, retrieving the status of a specific download, listing all downloads, and clearing all downloaded files.

  • When a download is added, the DownloadManager creates a new Download struct, starts a DownloadWorker process via the DownloadsSupervisor, and stores the download information in its state.

  • The DownloadManager can receive a {:terminating, id, last_child_state} message from the DownloadWorker processes. This message serves as a notification to the download manager about finished downloads or download failures, enabling it to update the state accordingly. By handling this message, the DownloadManager can track the final state of a download.

defmodule DownloadManager do
  @moduledoc """
  GenServer which stores aggregates and stores state for all download worker processes
  Exposes APIs to add, delete, list downloads.

  Examples:

  {:ok, id} = DownloadManager.add("https://file-examples-com.github.io/uploads/2017/04/file_example_MP4_1920_18MG.mp4")
  DownloadManager.list()
  DownloadManager.get(id)
  DownloadManager.remove(id)
  """
  use GenServer

  require Logger

  @update_interval 1000
  @base_download_path "/tmp/async_elixir_temp_downloads"

  # Public API

  @doc "Start the download manager process"
  def start_link(_opts), do: GenServer.start_link(__MODULE__, %{}, name: __MODULE__)

  @doc "Add a new download"
  def add(src), do: GenServer.call(__MODULE__, {:add, src})

  @doc "Remove an existing download"
  def remove(id), do: GenServer.call(__MODULE__, {:remove, id})

  @doc "Get the lastest information about a download"
  def get(id), do: GenServer.call(__MODULE__, {:get, id})

  @doc "Get the most updated list of downloads"
  def list(), do: GenServer.call(__MODULE__, :list)

  @doc "Clear all downloaded data"
  def clear_all_downloads(), do: File.rm_rf!(@base_download_path)

  # Callbacks

  @impl GenServer
  def init(_args) do
    # Send a message to itself the begin aggregating lastest download statuses
    Process.send_after(self(), :fetch_all, @update_interval)
    {:ok, %{}}
  end

  # Callback to get the status of a given download using it download id
  @impl GenServer
  def handle_call({:get, id}, _from, state) do
    case Map.get(state, id) do
      nil ->
        {
          :reply,
          {:error, :not_found},
          state
        }

      download ->
        {
          :reply,
          {:ok, download},
          state
        }
    end
  end

  # Callback to add a new download
  @impl GenServer
  def handle_call({:add, src}, _from, state) do
    id = UUID.uuid1()
    File.mkdir_p!(@base_download_path)
    download_destination = "#{@base_download_path}/#{id}"

    download = %Download{
      id: id,
      src: src,
      dest: download_destination,
      # This `from` parameter allows the download worker to send a message back to the
      # Download manager when the download finishes or in case of failures
      from: self(),
      name: guess_filename(src)
    }

    # Call the Download manager to start a new download worker process and initaite the download
    pid = DownloadsSupervisor.add(download)
    download = %Download{download | pid: pid}

    # Save the download in a map that is the GenServer state
    {:reply, {:ok, id}, Map.put(state, id, download)}
  end

  @impl GenServer
  def handle_call({:remove, id}, _from, state) do
    case Map.get(state, id) do
      %Download{pid: pid, dest: dest} ->
        # Send a message to the Download supervisor to terminate the Download Worker Process
        # that is downloading the file
        DownloadsSupervisor.remove(pid)

        # Delete the file from disk
        res = File.rm(dest)
        Logger.info("Remove left over file: #{inspect(res)}")

        # Remove the download from the downloads map that is the GenServer state
        {:reply, {:ok, id}, Map.delete(state, id)}

      _ ->
        {:reply, {:error, :not_found}, state}
    end
  end

  # Get the list of all downloads
  @impl GenServer
  def handle_call(:list, _from, state) do
    {:reply, Map.values(state), state}
  end

  # Used by a Download Worker to inform the Download manager when when the download ends
  @impl GenServer
  def handle_info({:terminating, id, last_child_state}, state) do
    {_old_value, state} =
      Map.get_and_update(state, id, fn
        current_value when is_nil(current_value) -> :pop
        current_value -> {current_value, merge_with_old_state(current_value, last_child_state)}
      end)

    Logger.info("Recieved last state from child: #{id}, new_state: #{inspect(state)}")
    {:noreply, state}
  end

  # Every 1 second (1000ms) we will query each of the active download worker
  # process to refresh the state of the running downloads
  @impl GenServer
  def handle_info(:fetch_all, state) do
    new_state =
      state
      |> Enum.map(fn
        {id, %Download{status: status} = download} when status in [:finish, :error, :cancel] ->
          {id, download}

        {id, %Download{pid: pid} = download} ->
          {id, fetch_status(pid, download)}
      end)
      |> Enum.into(%{})

    Process.send_after(self(), :fetch_all, @update_interval)
    {:noreply, new_state}
  end

  # Private helpers

  # Call the download worker process to fetch the lastest status of the Download
  defp fetch_status(pid, download) do
    if Process.alive?(pid) do
      new_download_state = GenServer.call(pid, :status)
      merge_with_old_state(download, new_download_state)
    else
      %Download{download | status: :error, error_reason: "Killed"}
    end
  end

  # Helper function to merge an old download struct with a new download struct
  defp merge_with_old_state(old_download, new_download) do
    %Download{
      old_download
      | size: new_download.size,
        status: new_download.status,
        bytes_downloaded: new_download.bytes_downloaded,
        start_time: new_download.start_time,
        end_time: new_download.end_time,
        error_reason: new_download.error_reason
    }
  end

  # Helper function that attempts to guess the name of the download from the download URL path
  # In case of failures it assigns a UUID as the download name
  defp guess_filename(url) do
    path =
      url
      |> URI.parse()
      |> Map.fetch!(:path)

    if(is_nil(path), do: UUID.uuid1(), else: path |> Path.basename() |> String.trim())
  end
end

At this point our download manager is ready.

Lets test it out! ๐Ÿš€๐Ÿš€๐Ÿš€

The Runner module

The Runner module allows us to test the functionality of our download manager and display the status of the downloads in a markdown table format. It uses the Kino library to render and update the table periodically.

The render_downloads_list/0 function continuously updates the downloads table using the Kino.animate/2 function. It retrieves the latest list of downloads from the DownloadManager and constructs the table data by formatting the relevant fields. The markdown table is rendered using Kino.Markdown.new/1.

defmodule Runner do
  # 1 second (1000ms)
  @refresh_rate 1000

  def render_downloads_list do
    # Every 1 second we refresh the downloads list
    Kino.animate(@refresh_rate, fn _ ->
      # Get the lastest downloads list from the DownloadManager
      downloads = DownloadManager.list()

      unless downloads == [] do
        data =
          downloads
          |> Enum.map(fn download ->
            data =
              [
                download.id,
                download.name,
                download.status,
                percentage_progresss(download),
                progress(download),
                get_speed(download),
                download.src,
                download.dest,
                download.start_time,
                download.end_time || "NA",
                download.error_reason || "NA"
              ]
              |> Enum.join("|")

            "|" <> data <> "|"
          end)
          |> Enum.join("\n")

        # The headers for the downloads table
        headers = """
        |ID|Name|Status|Percentage Completed|Progress|Speed|Source URL|Destination|Started At|Ended At|Error Reason|
        |--|----|------|--------------------|--------|-----|----------|-----------|----------|--------|------------|
        """

        # Render the downloads table in markdown format
        Kino.Markdown.new("#{headers}#{data}")
      end
    end)
  end

  # Private helper functions

  # Calculate the progress percentage of a download from the download size
  defp percentage_progresss(download) do
    if download.status != :initiate &amp;&amp; to_int(download.size) != 0 do
      percentage = download.bytes_downloaded / to_int(download.size) * 100
      "#{trunc(percentage)}%"
    else
      "NA"
    end
  end

  # Use the Sizeable library to show the size of the download in a human readable way
  defp progress(download) do
    if download.status != :initiate &amp;&amp; to_int(download.size) != 0 do
      "#{Sizeable.filesize(download.bytes_downloaded)} / #{Sizeable.filesize(download.size)}"
    else
      "NA"
    end
  end

  # Calculate the download speed using the download start time and data downloaded
  defp get_speed(download) when download.status == :active,
    do: "#{get_speed_in_bytes(download) |> Sizeable.filesize()}/sec"

  defp get_speed(_download), do: "NA"

  defp get_speed_in_bytes(%Download{bytes_downloaded: bytes_downloaded, start_time: start_time})
       when is_nil(start_time) or bytes_downloaded == 0,
       do: 0

  defp get_speed_in_bytes(download) do
    elapsed_time = DateTime.diff(DateTime.utc_now(), download.start_time)
    if elapsed_time == 0, do: 0, else: download.bytes_downloaded / elapsed_time
  end

  defp to_int(num) when is_binary(num) do
    case Integer.parse(num) do
      {num, _} -> num
      :error -> 0
    end
  end

  defp to_int(num), do: num
end

Now lets start our Download manager and Download supervisor.

# Stop any existing Download manager or Download supervisor processes
if Process.whereis(DownloadsSupervisor), do: DynamicSupervisor.stop(DownloadsSupervisor)
if Process.whereis(DownloadManager), do: GenServer.stop(DownloadManager)

# Clear any previously downloaded data
DownloadManager.clear_all_downloads() |> IO.inspect()

# Start the Download manager and Download supervisor processes
{:ok, download_sup_pid} = DownloadsSupervisor.start_link(:noop)
{:ok, download_manager_pid} = DownloadManager.start_link(:noop)
# Call the runner module to render the downloads list as a markdown table
Runner.render_downloads_list()
# Start 4 downloads of different sizes
{:ok, first_id} = DownloadManager.add("https://speed.hetzner.de/100MB.bin")
{:ok, second_id} = DownloadManager.add("https://speed.hetzner.de/1GB.bin")
{:ok, third_id} = DownloadManager.add("https://speed.hetzner.de/10GB.bin")

# This download will fail since the download url does not exists
{:ok, fourth_id} = DownloadManager.add("https://speed.hetzner.de/bad_file.bin")

Notice how the downloads in progress are being updated in real-time. It is worth noting that while one of the downloads encountered a failure, the remaining downloads continued unaffected.

This showcases the concept of process isolation, where failures in one process do not impact others. Furthermore, we receive informative notifications about the download failure, including the reason for the failure, this works because we are trapping exits in the DownloadWorker processes to update the final state of a download.

Now lets visualize the supervision tree of the Dynamic supervisor that is the DownloadsSupervisor module. As the downloads finish if we refresh the supervision tree we can notice how the worker processes are stopped and removed.

Kino.Process.render_sup_tree(download_sup_pid)

Remove the fourth download that failed, notice how the download entry is removed from the downloads table after this code is executed.

DownloadManager.remove(fourth_id)

Finally, we can trace the message flow between different processes when starting a new download. To achieve this, we utilize the Kino.Process.render_seq_trace/2 function. In this case, we provide the PID of the download manager processes to the function, ensuring that only the messages sent to and from the download manager can be traced.

pids_to_trace = [download_manager_pid]

# Trace and inspect messages being sent in between the processes
Kino.Process.render_seq_trace(pids_to_trace, fn ->
  {:ok, _first_id} = DownloadManager.add("https://speed.hetzner.de/100MB.bin")
  # Sleep to enable catching all messages between the processes
  :timer.sleep(1000)
end)

Congratulations on successfully building a download manager from scratch! ๐ŸŽ‰๐ŸŽ‰๐ŸŽ‰

Throughout this process, we applied various concepts that we have learned in previous chapters, reinforcing our understanding of Elixirโ€™s key features.

In summary, we utilized GenServer to manage the state of downloads, DynamicSupervisor to dynamically spawn and terminate download worker processes, trapping exits to handle failures gracefully, message passing between processes to communicate and coordinate activities, and finally, we learned how to write and organize code that utilizes multiple processes in Elixir.

Well done! ๐Ÿฅณ

Navigation

Home Scaling dynamic supervisorIntroduction to tasks