Streaming files
Mix.install([
{:flow, "~> 1.2"},
{:unzip, "~> 0.8.0"},
{:csv, "~> 3.0"},
{:nimble_csv, "~> 1.2"},
{:httpoison, "~> 2.0"},
{:explorer, "~> 0.5"},
{:finch, "~> 0.14"},
{:mint, "~> 1.0"},
{:ecto_sql, "~> 3.9"},
{:ecto_sqlite3, "~> 0.9.1"}
])
{:ok, conn_f} = Finch.start_link(name: MyFinch)
Introduction
These notes are basics for handling CSV datasets that are rather big, but not about Big Data! We want to handle data in CSV format, for example to seed a database. We use the Stream module to limit memory usage when the size of the data is big. We don’t handle retries in case of connections errors nor 206-“Partial Content” nor Range requests.
This is not about parallelism since we are dealing of IO-bound tasks here. It is about “lazy” evaluation in constrast to “eager” evaluation. If you want parallelism, multiple cores, then GenStage
and Flow
can help, as displayed at the end. The advantage of CSV files is the compatiblity with databases: most of them have native tools to insert into, or convert tables to CSV format. CSV files can also be parsed into DataFrames; any example is given with Explorer
at the end.
The data may be accessible from several sources:
- the CSV dataset is accessible from a remote storage, e.g. an S3 bucket.
- the CSV is accessible from an endpoint,
- the CSV is serialized in a database.
> The paragraph Higher Level Operations explains how to deal (upload, download) with large files with ExAws.S3.
If the data is zipped, we may have an archive file, with several files. Therefor, we firstly spool the zip into our local disk and then can work with it using streams: unzip, transform, decode the data into the needed form, a CSV file, an in-memory list. We will use Unzip and CSV to decode streams.
flowchart LR
Z>Zip Source] -. stream download .-> S(((spool zip)))
S -. Unzip stream > Stream.map > Stream.into .-> C[local CSV]
S -. Unzip stream > CSV.decode! > Enum.to_list .-> L[in memory List]
If the data is not zipped, we can stream down, CSV.decode an further process in streams all along.
> A gist to stream from AWS S3.
HTTP clients
We showcase 2 HTTP clients to reach an endpoint and stream down the response: HTTPoison and Finch.
HTTPoison requires to build a custom streaming function, coded with Stream.resource
, whilst Finch is more straightforward. We then can spool (if needed) this stream with Stream.into
a File.stream
.
HTTPoison
We use a Stream.resource to build a custom stream. You initiate {stream_to: self()}
and {:async, :once}
so you can process the next chunk of a response only when the previous chunk has been processed. In this config, you stream chunks of size 16400.
defmodule HTTPoisonStream do
@doc """
Uses `HTTPoison` and builds a custom stream from `HTTPoison.AsyncChunk`.
"""
def download(url) do
Stream.resource(
# start_fun: stream line by line with `async: :once`
fn -> HTTPoison.get!(url, %{}, stream_to: self(), async: :once) end,
# next_fun,
fn %HTTPoison.AsyncResponse{id: id} = resp ->
receive do
%HTTPoison.AsyncStatus{id: ^id, code: _code} ->
HTTPoison.stream_next(resp)
{[], resp}
%HTTPoison.AsyncHeaders{id: ^id, headers: _headers} ->
HTTPoison.stream_next(resp)
{[], resp}
%HTTPoison.AsyncChunk{id: ^id, chunk: chunk} ->
HTTPoison.stream_next(resp)
{[chunk], resp}
%HTTPoison.AsyncEnd{id: ^id} ->
{:halt, resp}
after
5000 ->
raise "timeout"
end
end,
# end_fun
fn %HTTPoison.AsyncResponse{id: id} = _resp ->
:hackney.stop_async(id)
end
)
end
end
Finch
Alternatively, we can use the HTTP client Finch to reach an endpoint and stream the response. Finch
needs to be started first. It is started dynamically here, with a name: MyFinch
in the setup.
> You need to push the received streams into the accumulator (ie append at the end). for unzip
to be able to grab the EOCD (End Of Central Directory).
defmodule HTTPFinchStream do
@doc """
Stream the HTTP response. Sets the EODC
"""
def download(url) do
# dest = Keyword.get(opts, :to, "tmp.zip")
case Finch.build(:get, url)
|> Finch.stream(MyFinch, [], fn
{:status, _status}, acc -> acc
{:headers, _headers}, acc -> acc
# push data, to put the EOCD at the end.
{:data, stream_data}, acc -> acc ++ [stream_data]
end) do
{:ok, file} -> file
_ -> :error
end
end
end
Other clients
-
package Req
Datasets endpoints
You can use the two endpoints below:
zipped_airports =
"https://pkgstore.datahub.io/core/airport-codes/airport-codes_zip/data/173bf3d453e2aea540dc01c3421753fe/airport-codes_zip.zip"
unzipped_airports =
"https://pkgstore.datahub.io/core/airport-codes/airport-codes_csv/data/e07739e49300d125989ee543d5598c4b/airport-codes_csv.csv"
unzipped_films = "https://perso.telecom-paristech.fr/eagan/class/igr204/data/film.csv"
Download CSV datasets
We will use the package Unzip to unzip files, and the package CSV that allows you to encode/decode streams of data.
Endpoint serving a zipped file
If we reach a zip archive, it can contain several files so you need to firstly spool the zip, then read the directory and select a file.
You can run unzip -l name.zip
in a terminal to list the files contained in a zip archive. Since we will use the package Unzip, it offers use a convenient Unzip.list_entries/1 to list the file archive directory.
flowchart LR
Z>zip_endpoint] -. HTTP.stream > Stream.into .-> S(((local: tmp.zip)))
S -. Unzip.list_entries .-> S
The local zip file may be an archive with several files. You select the file you want. Then you can stream all along the way: unzip, decode, and build in-memory dataset or stream into a new file.
flowchart LR
S(((local: tmp.zip))) -. Unzip stream > CSV.decode > Stream.into .-> C(((local CSV)))
S -- Unzip stream > CSV.decode! > Enum.to_list--> L[in memory List]
defmodule HTTPStream do
def spool(stream, opts \\ []) do
dest = Keyword.get(opts, :to, "tmp.zip")
stream
|> Stream.into(File.stream!(dest))
|> Stream.run()
{:ok, dest}
end
@doc """
Read the file archive directory
"""
def check_zip(zip_arch) do
Unzip.LocalFile.open(zip_arch)
|> Unzip.new()
|> elem(1)
|> Unzip.list_entries()
|> Enum.reduce([], fn %Unzip.Entry{file_name: file_name, compressed_size: c_size}, acc ->
[{file_name, c_size} | acc]
end)
end
end
{:ok, tmp_zip} =
HTTPFinchStream.download(zipped_airports)
|> HTTPStream.spool(to: "tmp.zip")
# check the file archive directory
try do
HTTPStream.check_zip(tmp_zip)
|> IO.inspect()
rescue
e ->
%KeyError{term: msg} = e
inspect(msg)
end
{:ok, unzip} =
Unzip.LocalFile.open(tmp_zip)
|> Unzip.new()
# unzip the selected downloaded file > save decompressed into CSV
Unzip.file_stream!(unzip, "archive/airport-codes.csv")
|> Stream.map(&IO.chardata_to_string/1)
|> Stream.into(File.stream!("airports-f.csv"))
|> Stream.run()
# unzip the selected downloaded file > in-memory List
Unzip.file_stream!(unzip, "archive/airport-codes.csv")
|> Stream.map(&IO.chardata_to_string/1)
|> CSV.decode!(headers: true)
|> Enum.to_list()
Endpoint serving an unzipped CSV file
If the file is not zipped, then we can stream all along until the final format we want ot use.
flowchart LR
endpoint -- HTTP.stream > CSV.decode! > Enum.to_list --> M[in memory Map]
endpoint -- HTTP.stream > Stream.into -->S[local CSV]
We can stream down and spool. Have a look into your disk to find the files.
HTTPoisonStream.download(unzipped_films)
|> HTTPStream.spool(to: "films-p.csv")
You can stream down and transform into an in-mmemory map:
HTTPoisonStream.download(unzipped_airports)
|> Stream.map(&IO.chardata_to_string/1)
|> CSV.decode!(headers: true, separator: ?,)
|> Enum.to_list()
Another example where we stream down and map into memory (the CSV used here has “;” separators).
HTTPFinchStream.download(unzipped_films)
|> Stream.map(&IO.chardata_to_string/1)
|> CSV.decode!(headers: true, separator: ?;)
|> Enum.to_list()
You can transform the dataset into a DataFrame
:
Explorer.DataFrame.from_csv!("airports-f.csv")
Database setup: Ecto-SQLite
We want to be able to stream our CSV file into a database, and also back-up our database into a CSV file. We setup the serverless database SQLite for ease.
> If we further want to use a remote S3 bucket, one may consider Litestream. See also this post: “Litestream allows us to backup our SQLite database to any S3 compatible storage after every transaction. It will also restore from that backup, meaning that we can restore the latest version of the database.”
We open a connection to a SQLite database and create a table to store each line of the CSV file. We use these headers to build a SQLite table. Each field is of type “string”.
Firstly, we can get the headers of the CSV file into a list:
headers =
File.stream!("airports-f.csv")
|> Enum.take(1)
|> hd()
|> String.split(",")
|> Enum.map(&String.trim/1)
We start a connection to an SQLite database, then create a table
defmodule Conn do
# @headers headers
def start do
# Exqlite.Sqlite3.open(":memory:")
Exqlite.Sqlite3.open("test")
end
def create_table(conn, table) do
Exqlite.Sqlite3.execute(conn, "DROP TABLE IF EXISTS #{table}")
Exqlite.Sqlite3.execute(
conn,
"CREATE TABLE IF NOT EXISTS #{table} (
id integer primary key,
ident text,
elevation_ft text,
type text,
name text,
continent text,
iso_country text,
iso_region text,
municipality text,
iata_code text,
local_code text,
gps_code text,
coordinates text
)"
)
end
def reset_table(conn, table) do
Exqlite.Sqlite3.execute(conn, "DROP TABLE IF EXISTS #{table}")
create_table(conn, table)
end
end
{:ok, conn} = Conn.start()
Conn.create_table(conn, "csv")
We define our Repo and start it:
defmodule ARepo do
use Ecto.Repo, adapter: Ecto.Adapters.SQLite3, otp_app: :noop
end
case ARepo.start_link(database: "test", default_chunk_size: 100) do
{:ok, pid} -> {:ok, pid}
{:error, {_, pid}} -> {:ok, pid}
end
We use the headers of the CSV file to define a schema:
defmodule Airport do
use Ecto.Schema
import Ecto.Changeset
@headers headers
@columns Enum.map(@headers, &String.to_atom/1)
schema "csv" do
Enum.each(@columns, &field(&1, :string))
# timestamps()
end
def changeset(attrs \\ %{}) do
%Airport{}
|> cast(attrs, @columns)
|> validate_required(@columns -- [:iata_code])
end
end
We check that our schema is correctly built and that the database and the table “csv” is ready:
Airport.__schema__(:fields) |> IO.inspect()
ARepo.all(Airport)
From CSV to DB
The easiest way is to use the database. With Postgres, you have the COPY
command. Note that the path to the file is absolute. This tuto explains COPY FROM
for Postgres, and this one for SQLite.
Since we may want to transform the data, we setup an Elixir pipeline.
flowchart
subgraph transaction
F(((local zip))) -. Unzip.file_stream! > CSV.decode! .- S[ ..file stream..]
S -. Stream.chunk_every > Stream Repo.insert_all .-> D[(Database)]
end
We will parse the data into a map for the Ecto
insertion.
defmodule AirportDataset do
@moduledoc """
Provides mapping for the Airport dataset:
"""
def map(row) do
%{
ident: Enum.at(row, 0),
type: Enum.at(row, 1),
name: Enum.at(row, 2),
elevation_ft: Enum.at(row, 3),
continent: Enum.at(row, 4),
iso_country: Enum.at(row, 5),
iso_region: Enum.at(row, 6),
municipality: Enum.at(row, 7),
gps_code: Enum.at(row, 8),
iata_code: Enum.at(row, 9),
local_code: Enum.at(row, 10),
coordinates: Enum.at(row, 11)
}
end
end
We compose the streams and run the insertion within a transaction.
defmodule Import2DB do
def zip_to_db(select: source) do
unzipped_stream =
Unzip.LocalFile.open("tmp.zip")
|> Unzip.new()
|> elem(1)
|> Unzip.file_stream!(source)
|> Stream.map(&IO.chardata_to_string/1)
|> CSV.decode!(headers: false)
|> Stream.map(&AirportDataset.map/1)
|> tap(fn _ -> File.rm("tmp.zip") end)
# cleanup the unused file
# serialize in the database within a transaction
ARepo.transaction(fn ->
unzipped_stream
|> Stream.chunk_every(1000)
# insert by groups of 1000 rows
|> Stream.each(&ARepo.insert_all(Airport, &1))
|> Stream.run()
end)
end
end
We use the module above to stream down a zipped file, spool it, and then run a transaction to stream into the database:
Conn.reset_table(conn, "csv")
{:ok, file} =
HTTPoisonStream.download(zipped_airports)
|> HTTPStream.spool()
Import2DB.zip_to_db(select: "archive/airport-codes.csv")
We check if the data is inserted:
ARepo.aggregate(Airport, :count)
We check for one row in particular:
ARepo.get(Airport, 3)
From DB to CSV
You can use [directly](
) the database to perform this transformation. This SQL tuto and this Postgres tuto explain how to export a table into a CSV file with COPY TO
.
To import with Postgres
,
Since we (potentially) want to transform the data, we will stream all along from the database to a CSV file. We firstly need to query the data from a database: we use Repo.stream
within a transaction to generate a stream of queries, then transform the data, and encode thh stream into CSV format.
flowchart
subgraph Transaction
D[(Database)] -. Repo.stream > Stream.map > CSV.encode! > Stream.into .-> F(((CSV file)))
end
The module below runs a Repo.stream
within a transaction and sttreams into a file:
defmodule ExportFromDB do
# You have to pass `Repo.stream` in a transaction to block the IO operations
# Repo.stream default chunk size is 500 rows.
# Default timeout of transaction is 15s, so you can pass `timeout: :infinity`
def spool_db(query, to: file_name) do
ARepo.transaction(fn ->
query
|> ARepo.stream()
|> Stream.map(&Map.values(&1))
|> CSV.encode()
|> Stream.into(File.stream!(file_name, [:write, :utf8]))
|> Stream.run()
end)
end
end
We use the module above to export the database into a CSV file:
import Ecto.Query
query =
from(a in Airport,
limit: 10,
select: map(a, ~w(id type name iso_country coordinates continent iso_region municipality)a)
)
ExportFromDB.spool_db(query, to: "save-airport.csv")
We check the CSV file:
File.stat!("save-airport.csv").size
Exqlite.Sqlite3.execute(conn, "DROP TABLE IF EXISTS csv")
Exqlite.Sqlite3.close(conn)
Aggregation: concurrency with Flow
Streams do not bring concureny, only lazy evaluation. We will use the package Flow to run code concurrently. We also use the package NimbleCSV to decode line by line.
The data here is small and bounded (as opposed to “unbounded”, a continuous flow of data), so the davantage of using Flow
is not clear. Nevertheless, you can compose aggregations when you introduce partitions as showcased here.
defmodule UseFlow do
alias NimbleCSV.RFC4180, as: CSV2
def airports do
File.stream!("airports-f.csv")
|> Flow.from_enumerable()
|> Flow.map(fn row ->
[row] = CSV2.parse_string(row, skip_headers: false)
%{
id: Enum.at(row, 0),
type: Enum.at(row, 2),
name: Enum.at(row, 3),
country: Enum.at(row, 5)
}
end)
|> Flow.reject(&(&1.type == "closed"))
|> Flow.partition(key: {:key, :country})
|> Flow.reduce(fn -> %{} end, fn item, acc ->
Map.update(acc, item.country, 1, &(&1 + 1))
end)
|> Flow.take_sort(10, fn {_, a}, {_, b} -> a > b end)
|> Enum.to_list()
end
end
:timer.tc(fn -> UseFlow.airports() end)
Aggregation: Explorer.DataFrame
Since the data can be persented as a DataFrame, a two entry tabular dataset, we can use the library Explorer.DataFrame.
The library has a nice introduction Livebook:
require Explorer.DataFrame, as: DF
df = DF.from_csv!("airports-f.csv")
DF.table(df)
DF.group_by(df, ["iso_country"])
|> DF.summarise(count: count(iso_country))
|> DF.table()
:timer.tc(fn ->
DF.from_csv!("airports-f.csv")
|> DF.group_by(["iso_country"])
|> DF.summarise(count: count(iso_country))
end)