MET-124 Convert CSV to stream of erlang terms.
Section
Reading and converting the CSV is a tad slow, so here we read it once and then write back a hopefully quicker to process binary format.
input = "/home/eng/dev/backend/livebooks/data/bulk/telemetry_added.csv"
sorted = String.replace(input, ".csv", ".srt")
binary_sorted = String.replace(sorted, ".srt", ".srt.bin")
out_file =
File.open!(binary_sorted, [:write, :binary, {:delayed_write, 10 * 1024 * 1024, 10_000}])
sorted
|> File.stream!()
|> Task.async_stream(
fn line ->
[dtstring, id, check, instance, valuestring] = String.split(line, ",")
{dt, _} = Integer.parse(dtstring)
{value, _} = Float.parse(valuestring)
binary = :erlang.term_to_binary({dt, id, check, instance, value})
size = :erlang.size(binary)
<>
end,
ordered: true,
timeout: 100_000
)
|> Stream.map(fn {:ok, binary} -> IO.binwrite(out_file, binary) end)
|> Enum.to_list()
File.close(out_file)
defmodule Binfile do
def stream_from(filename) do
Stream.resource(
fn -> File.open!(filename, [:read, :binary]) end,
fn handle ->
case record(handle) do
nil -> {:halt, handle}
record -> {[record], handle}
end
end,
fn handle -> File.close(handle) end
)
end
def record(handle) do
case :file.read(handle, 4) do
:eof ->
nil
{:ok, <>} ->
{:ok, binary} = :file.read(handle, size)
:erlang.binary_to_term(binary)
end
end
end
# Test reading it back
binary_sorted
|> Binfile.stream_from()
|> Enum.to_list()