Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

MET-124 Convert CSV to stream of erlang terms.

livebooks/MET-1246-optimize-io.livemd

MET-124 Convert CSV to stream of erlang terms.

Section

Reading and converting the CSV is a tad slow, so here we read it once and then write back a hopefully quicker to process binary format.

input = "/home/eng/dev/backend/livebooks/data/bulk/telemetry_added.csv"
sorted = String.replace(input, ".csv", ".srt")
binary_sorted = String.replace(sorted, ".srt", ".srt.bin")

out_file =
  File.open!(binary_sorted, [:write, :binary, {:delayed_write, 10 * 1024 * 1024, 10_000}])

sorted
|> File.stream!()
|> Task.async_stream(
  fn line ->
    [dtstring, id, check, instance, valuestring] = String.split(line, ",")
    {dt, _} = Integer.parse(dtstring)
    {value, _} = Float.parse(valuestring)
    binary = :erlang.term_to_binary({dt, id, check, instance, value})
    size = :erlang.size(binary)
    <>
  end,
  ordered: true,
  timeout: 100_000
)
|> Stream.map(fn {:ok, binary} -> IO.binwrite(out_file, binary) end)
|> Enum.to_list()

File.close(out_file)
defmodule Binfile do
  def stream_from(filename) do
    Stream.resource(
      fn -> File.open!(filename, [:read, :binary]) end,
      fn handle ->
        case record(handle) do
          nil -> {:halt, handle}
          record -> {[record], handle}
        end
      end,
      fn handle -> File.close(handle) end
    )
  end

  def record(handle) do
    case :file.read(handle, 4) do
      :eof ->
        nil

      {:ok, <>} ->
        {:ok, binary} = :file.read(handle, size)
        :erlang.binary_to_term(binary)
    end
  end
end

# Test reading it back
binary_sorted
|> Binfile.stream_from()
|> Enum.to_list()