Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Streaming files

stream-files.livemd

Streaming files

Mix.install([
  {:flow, "~> 1.2"},
  {:unzip, "~> 0.8.0"},
  {:csv, "~> 3.0"},
  {:nimble_csv, "~> 1.2"},
  {:httpoison, "~> 2.0"},
  {:explorer, "~> 0.5"},
  {:finch, "~> 0.14"},
  {:mint, "~> 1.0"},
  {:ecto_sql, "~> 3.9"},
  {:ecto_sqlite3, "~> 0.9.1"}
])

{:ok, conn_f} = Finch.start_link(name: MyFinch)

Introduction

These notes are basics for handling CSV datasets that are rather big, but not about Big Data! We want to handle data in CSV format, for example to seed a database. We use the Stream module to limit memory usage when the size of the data is big. We don’t handle retries in case of connections errors nor 206-“Partial Content” nor Range requests.

Reading: here and there

This is not about parallelism since we are dealing of IO-bound tasks here. It is about “lazy” evaluation in constrast to “eager” evaluation. If you want parallelism, multiple cores, then GenStage and Flow can help, as displayed at the end. The advantage of CSV files is the compatiblity with databases: most of them have native tools to insert into, or convert tables to CSV format. CSV files can also be parsed into DataFrames; any example is given with Explorer at the end.

The data may be accessible from several sources:

  1. the CSV dataset is accessible from a remote storage, e.g. an S3 bucket.
  2. the CSV is accessible from an endpoint,
  3. the CSV is serialized in a database.

> The paragraph Higher Level Operations explains how to deal (upload, download) with large files with ExAws.S3.

If the data is zipped, we may have an archive file, with several files. Therefor, we firstly spool the zip into our local disk and then can work with it using streams: unzip, transform, decode the data into the needed form, a CSV file, an in-memory list. We will use Unzip and CSV to decode streams.

flowchart LR
Z>Zip Source] -. stream download .-> S(((spool zip)))

S -. Unzip stream > Stream.map > Stream.into .-> C[local CSV]
S -. Unzip stream > CSV.decode! > Enum.to_list .-> L[in memory List]

If the data is not zipped, we can stream down, CSV.decode an further process in streams all along.

> A gist to stream from AWS S3.

HTTP clients

We showcase 2 HTTP clients to reach an endpoint and stream down the response: HTTPoison and Finch.

HTTPoison requires to build a custom streaming function, coded with Stream.resource, whilst Finch is more straightforward. We then can spool (if needed) this stream with Stream.into a File.stream.

HTTPoison

We use a Stream.resource to build a custom stream. You initiate {stream_to: self()} and {:async, :once} so you can process the next chunk of a response only when the previous chunk has been processed. In this config, you stream chunks of size 16400.

defmodule HTTPoisonStream do
  @doc """
  Uses `HTTPoison` and builds a custom stream from `HTTPoison.AsyncChunk`.
  """
  def download(url) do
    Stream.resource(
      # start_fun: stream line by line with `async: :once`
      fn -> HTTPoison.get!(url, %{}, stream_to: self(), async: :once) end,

      # next_fun,
      fn %HTTPoison.AsyncResponse{id: id} = resp ->
        receive do
          %HTTPoison.AsyncStatus{id: ^id, code: _code} ->
            HTTPoison.stream_next(resp)
            {[], resp}

          %HTTPoison.AsyncHeaders{id: ^id, headers: _headers} ->
            HTTPoison.stream_next(resp)
            {[], resp}

          %HTTPoison.AsyncChunk{id: ^id, chunk: chunk} ->
            HTTPoison.stream_next(resp)
            {[chunk], resp}

          %HTTPoison.AsyncEnd{id: ^id} ->
            {:halt, resp}
        after
          5000 ->
            raise "timeout"
        end
      end,
      # end_fun
      fn %HTTPoison.AsyncResponse{id: id} = _resp ->
        :hackney.stop_async(id)
      end
    )
  end
end

Finch

Alternatively, we can use the HTTP client Finch to reach an endpoint and stream the response. Finch needs to be started first. It is started dynamically here, with a name: MyFinch in the setup.

> You need to push the received streams into the accumulator (ie append at the end). for unzip to be able to grab the EOCD (End Of Central Directory).

defmodule HTTPFinchStream do
  @doc """
  Stream the HTTP response. Sets the EODC
  """
  def download(url) do
    # dest = Keyword.get(opts, :to, "tmp.zip")
    case Finch.build(:get, url)
         |> Finch.stream(MyFinch, [], fn
           {:status, _status}, acc -> acc
           {:headers, _headers}, acc -> acc
           # push data, to put the EOCD at the end.
           {:data, stream_data}, acc -> acc ++ [stream_data]
         end) do
      {:ok, file} -> file
      _ -> :error
    end
  end
end

Other clients

Datasets endpoints

You can use the two endpoints below:

zipped_airports =
  "https://pkgstore.datahub.io/core/airport-codes/airport-codes_zip/data/173bf3d453e2aea540dc01c3421753fe/airport-codes_zip.zip"

unzipped_airports =
  "https://pkgstore.datahub.io/core/airport-codes/airport-codes_csv/data/e07739e49300d125989ee543d5598c4b/airport-codes_csv.csv"

unzipped_films = "https://perso.telecom-paristech.fr/eagan/class/igr204/data/film.csv"

Download CSV datasets

We will use the package Unzip to unzip files, and the package CSV that allows you to encode/decode streams of data.

Endpoint serving a zipped file

If we reach a zip archive, it can contain several files so you need to firstly spool the zip, then read the directory and select a file.

You can run unzip -l name.zip in a terminal to list the files contained in a zip archive. Since we will use the package Unzip, it offers use a convenient Unzip.list_entries/1 to list the file archive directory.

flowchart LR
  Z>zip_endpoint] -. HTTP.stream > Stream.into .-> S(((local: tmp.zip)))
  S -. Unzip.list_entries .-> S

The local zip file may be an archive with several files. You select the file you want. Then you can stream all along the way: unzip, decode, and build in-memory dataset or stream into a new file.

flowchart LR
S(((local: tmp.zip))) -. Unzip stream > CSV.decode > Stream.into .-> C(((local CSV)))
S -- Unzip stream > CSV.decode! > Enum.to_list--> L[in memory List]
defmodule HTTPStream do
  def spool(stream, opts \\ []) do
    dest = Keyword.get(opts, :to, "tmp.zip")

    stream
    |> Stream.into(File.stream!(dest))
    |> Stream.run()

    {:ok, dest}
  end

  @doc """
  Read the file archive directory
  """
  def check_zip(zip_arch) do
    Unzip.LocalFile.open(zip_arch)
    |> Unzip.new()
    |> elem(1)
    |> Unzip.list_entries()
    |> Enum.reduce([], fn %Unzip.Entry{file_name: file_name, compressed_size: c_size}, acc ->
      [{file_name, c_size} | acc]
    end)
  end
end
{:ok, tmp_zip} =
  HTTPFinchStream.download(zipped_airports)
  |> HTTPStream.spool(to: "tmp.zip")

# check the file archive directory
try do
  HTTPStream.check_zip(tmp_zip)
  |> IO.inspect()
rescue
  e ->
    %KeyError{term: msg} = e
    inspect(msg)
end

{:ok, unzip} =
  Unzip.LocalFile.open(tmp_zip)
  |> Unzip.new()

# unzip the selected downloaded file > save decompressed into CSV
Unzip.file_stream!(unzip, "archive/airport-codes.csv")
|> Stream.map(&IO.chardata_to_string/1)
|> Stream.into(File.stream!("airports-f.csv"))
|> Stream.run()

# unzip the selected downloaded file > in-memory List
Unzip.file_stream!(unzip, "archive/airport-codes.csv")
|> Stream.map(&IO.chardata_to_string/1)
|> CSV.decode!(headers: true)
|> Enum.to_list()

Endpoint serving an unzipped CSV file

If the file is not zipped, then we can stream all along until the final format we want ot use.

flowchart LR
endpoint -- HTTP.stream > CSV.decode! > Enum.to_list --> M[in memory Map]
endpoint -- HTTP.stream > Stream.into -->S[local CSV]

We can stream down and spool. Have a look into your disk to find the files.

HTTPoisonStream.download(unzipped_films)
|> HTTPStream.spool(to: "films-p.csv")

You can stream down and transform into an in-mmemory map:

HTTPoisonStream.download(unzipped_airports)
|> Stream.map(&IO.chardata_to_string/1)
|> CSV.decode!(headers: true, separator: ?,)
|> Enum.to_list()

Another example where we stream down and map into memory (the CSV used here has “;” separators).

HTTPFinchStream.download(unzipped_films)
|> Stream.map(&IO.chardata_to_string/1)
|> CSV.decode!(headers: true, separator: ?;)
|> Enum.to_list()

You can transform the dataset into a DataFrame:

Explorer.DataFrame.from_csv!("airports-f.csv")

Database setup: Ecto-SQLite

We want to be able to stream our CSV file into a database, and also back-up our database into a CSV file. We setup the serverless database SQLite for ease.

> If we further want to use a remote S3 bucket, one may consider Litestream. See also this post: “Litestream allows us to backup our SQLite database to any S3 compatible storage after every transaction. It will also restore from that backup, meaning that we can restore the latest version of the database.”

We open a connection to a SQLite database and create a table to store each line of the CSV file. We use these headers to build a SQLite table. Each field is of type “string”.

Firstly, we can get the headers of the CSV file into a list:

headers =
  File.stream!("airports-f.csv")
  |> Enum.take(1)
  |> hd()
  |> String.split(",")
  |> Enum.map(&String.trim/1)

We start a connection to an SQLite database, then create a table

defmodule Conn do
  # @headers headers
  def start do
    # Exqlite.Sqlite3.open(":memory:")
    Exqlite.Sqlite3.open("test")
  end

  def create_table(conn, table) do
    Exqlite.Sqlite3.execute(conn, "DROP TABLE IF EXISTS #{table}")

    Exqlite.Sqlite3.execute(
      conn,
      "CREATE TABLE IF NOT EXISTS #{table} (
        id integer primary key,
        ident text,
        elevation_ft text,
        type text,
        name text,
        continent text,
        iso_country text,
        iso_region text,
        municipality text,
        iata_code text,
        local_code text,
        gps_code text,
        coordinates text
        )"
    )
  end

  def reset_table(conn, table) do
    Exqlite.Sqlite3.execute(conn, "DROP TABLE IF EXISTS #{table}")
    create_table(conn, table)
  end
end

{:ok, conn} = Conn.start()
Conn.create_table(conn, "csv")

We define our Repo and start it:

defmodule ARepo do
  use Ecto.Repo, adapter: Ecto.Adapters.SQLite3, otp_app: :noop
end

case ARepo.start_link(database: "test", default_chunk_size: 100) do
  {:ok, pid} -> {:ok, pid}
  {:error, {_, pid}} -> {:ok, pid}
end

We use the headers of the CSV file to define a schema:

defmodule Airport do
  use Ecto.Schema
  import Ecto.Changeset
  @headers headers
  @columns Enum.map(@headers, &String.to_atom/1)

  schema "csv" do
    Enum.each(@columns, &field(&1, :string))
    # timestamps()
  end

  def changeset(attrs \\ %{}) do
    %Airport{}
    |> cast(attrs, @columns)
    |> validate_required(@columns -- [:iata_code])
  end
end

We check that our schema is correctly built and that the database and the table “csv” is ready:

Airport.__schema__(:fields) |> IO.inspect()
ARepo.all(Airport)

From CSV to DB

The easiest way is to use the database. With Postgres, you have the COPY command. Note that the path to the file is absolute. This tuto explains COPY FROM for Postgres, and this one for SQLite.

Since we may want to transform the data, we setup an Elixir pipeline.

flowchart
subgraph transaction
F(((local zip))) -. Unzip.file_stream!  > CSV.decode! .- S[ ..file stream..]
S -. Stream.chunk_every > Stream Repo.insert_all .-> D[(Database)]
end

We will parse the data into a map for the Ecto insertion.

defmodule AirportDataset do
  @moduledoc """
  Provides mapping for the Airport dataset: 
  """
  def map(row) do
    %{
      ident: Enum.at(row, 0),
      type: Enum.at(row, 1),
      name: Enum.at(row, 2),
      elevation_ft: Enum.at(row, 3),
      continent: Enum.at(row, 4),
      iso_country: Enum.at(row, 5),
      iso_region: Enum.at(row, 6),
      municipality: Enum.at(row, 7),
      gps_code: Enum.at(row, 8),
      iata_code: Enum.at(row, 9),
      local_code: Enum.at(row, 10),
      coordinates: Enum.at(row, 11)
    }
  end
end

We compose the streams and run the insertion within a transaction.

defmodule Import2DB do
  def zip_to_db(select: source) do
    unzipped_stream =
      Unzip.LocalFile.open("tmp.zip")
      |> Unzip.new()
      |> elem(1)
      |> Unzip.file_stream!(source)
      |> Stream.map(&IO.chardata_to_string/1)
      |> CSV.decode!(headers: false)
      |> Stream.map(&AirportDataset.map/1)
      |> tap(fn _ -> File.rm("tmp.zip") end)

    # cleanup the unused file

    # serialize in the database within a transaction
    ARepo.transaction(fn ->
      unzipped_stream
      |> Stream.chunk_every(1000)
      # insert by groups of 1000 rows
      |> Stream.each(&ARepo.insert_all(Airport, &1))
      |> Stream.run()
    end)
  end
end

We use the module above to stream down a zipped file, spool it, and then run a transaction to stream into the database:

Conn.reset_table(conn, "csv")

{:ok, file} =
  HTTPoisonStream.download(zipped_airports)
  |> HTTPStream.spool()

Import2DB.zip_to_db(select: "archive/airport-codes.csv")

We check if the data is inserted:

ARepo.aggregate(Airport, :count)

We check for one row in particular:

ARepo.get(Airport, 3)

From DB to CSV

You can use [directly](

) the database to perform this transformation. This SQL tuto and this Postgres tuto explain how to export a table into a CSV file with COPY TO. To import with Postgres,

Since we (potentially) want to transform the data, we will stream all along from the database to a CSV file. We firstly need to query the data from a database: we use Repo.stream within a transaction to generate a stream of queries, then transform the data, and encode thh stream into CSV format.

flowchart
subgraph Transaction
D[(Database)] -. Repo.stream > Stream.map > CSV.encode! > Stream.into .-> F(((CSV file)))
end

The module below runs a Repo.stream within a transaction and sttreams into a file:

defmodule ExportFromDB do
  # You have to pass `Repo.stream` in a transaction to block the IO operations
  # Repo.stream default chunk size is 500 rows. 
  # Default timeout of transaction is 15s, so you can pass `timeout: :infinity`
  def spool_db(query, to: file_name) do
    ARepo.transaction(fn ->
      query
      |> ARepo.stream()
      |> Stream.map(&Map.values(&1))
      |> CSV.encode()
      |> Stream.into(File.stream!(file_name, [:write, :utf8]))
      |> Stream.run()
    end)
  end
end

We use the module above to export the database into a CSV file:

import Ecto.Query

query =
  from(a in Airport,
    limit: 10,
    select: map(a, ~w(id type name iso_country coordinates continent iso_region municipality)a)
  )

ExportFromDB.spool_db(query, to: "save-airport.csv")

We check the CSV file:

File.stat!("save-airport.csv").size
Exqlite.Sqlite3.execute(conn, "DROP TABLE IF EXISTS csv")
Exqlite.Sqlite3.close(conn)

Aggregation: concurrency with Flow

Streams do not bring concureny, only lazy evaluation. We will use the package Flow to run code concurrently. We also use the package NimbleCSV to decode line by line.

The data here is small and bounded (as opposed to “unbounded”, a continuous flow of data), so the davantage of using Flow is not clear. Nevertheless, you can compose aggregations when you introduce partitions as showcased here.

defmodule UseFlow do
  alias NimbleCSV.RFC4180, as: CSV2

  def airports do
    File.stream!("airports-f.csv")
    |> Flow.from_enumerable()
    |> Flow.map(fn row ->
      [row] = CSV2.parse_string(row, skip_headers: false)

      %{
        id: Enum.at(row, 0),
        type: Enum.at(row, 2),
        name: Enum.at(row, 3),
        country: Enum.at(row, 5)
      }
    end)
    |> Flow.reject(&(&1.type == "closed"))
    |> Flow.partition(key: {:key, :country})
    |> Flow.reduce(fn -> %{} end, fn item, acc ->
      Map.update(acc, item.country, 1, &(&1 + 1))
    end)
    |> Flow.take_sort(10, fn {_, a}, {_, b} -> a > b end)
    |> Enum.to_list()
  end
end
:timer.tc(fn -> UseFlow.airports() end)

Aggregation: Explorer.DataFrame

Since the data can be persented as a DataFrame, a two entry tabular dataset, we can use the library Explorer.DataFrame.

The library has a nice introduction Livebook:

Run in Livebook

require Explorer.DataFrame, as: DF

df = DF.from_csv!("airports-f.csv")

DF.table(df)
DF.group_by(df, ["iso_country"])
|> DF.summarise(count: count(iso_country))
|> DF.table()
:timer.tc(fn ->
  DF.from_csv!("airports-f.csv")
  |> DF.group_by(["iso_country"])
  |> DF.summarise(count: count(iso_country))
end)