Powered by AppSignal & Oban Pro

Measurments Loader

MeasurmentsLoader.livemd

Measurments Loader

Modules

 defmodule Parser do

  defp parse_date(date) do
    date |> String.split("-") 
         |> Enum.map(& String.to_integer(&1)) 
         |> List.to_tuple()

  end


  defp parse_time(time) do
    time |> String.split(":") 
         |> Enum.map(& String.slice(&1, 0..1))
         |> Enum.map(& String.to_integer(&1)) 
         |> List.to_tuple()
  end

  

  def parse_line(line) do
    [datetime, type, val, stationId, stationName, location] = line |> String.split(";")


    [date, time] = datetime |> String.split("T")
  

    %{:datetime => {parse_date(date), parse_time(time)},
    :location => location |> String.split(",") 
                          |> Enum.map(& String.to_float(&1)) 
                          |> List.to_tuple(),

      

    :stationId => stationId |> String.to_integer(),
    :stationName => String.to_charlist(stationName),
    :pollutionType => String.to_charlist(type),
    :pollutionLevel => val |> String.to_float()}

  end

end 
defmodule Measurments do
  @file_path "/home/typsoo/Projects/ElexirLiveBooks/pollution_50k.csv/AirlyData-ALL-50k.csv"

  def load_with_enum do
  
    parsed_data = @file_path 
      |> File.read!()
      |> String.trim()
      |> String.split("\n")
      |> Enum.map(&Parser.parse_line/1)

    parsed_data 
      |> Enum.uniq_by(& &1[:stationId])
      |> Enum.each(& :pollution_gen_server.addStation(
        Integer.to_charlist(&1[:stationId]) ++ ~c" " ++ &1[:stationName], 
        &1[:location]))
        
    parsed_data|> Enum.each(& :pollution_gen_server
                   .addValue(Integer.to_charlist(&1[:stationId])++ ~c" " ++ &1[:stationName], 
                                                 &1[:datetime], 
                                                 &1[:pollutionType],
                                                 &1[:pollutionLevel]))
  end

  def load_with_stream do

    parsed_data = @file_path 
      |> File.stream!()
      |> Stream.map(&String.trim/1)
      |> Enum.map(&Parser.parse_line/1)
  
    parsed_data 
      |> Stream.uniq_by(& &1[:stationId])
      |> Enum.each(& :pollution_gen_server.addStation(Integer.to_charlist(&1[:stationId])++ ~c" " ++ &1[:stationName], &1[:location]))
      
    parsed_data |> Enum.each(& :pollution_gen_server
                   .addValue(Integer.to_charlist(&1[:stationId])++ ~c" " ++ &1[:stationName], 
                                                 &1[:datetime], 
                                                 &1[:pollutionType],
                                                 &1[:pollutionLevel]))
  end

  
end

Tests

Code.append_path("/home/typsoo/IdeaProjects/Erlang/my_app/_build/default/lib/my_app/ebin")
Application.start(:my_app)
time = fn -> Measurments.load_with_enum() end |> :timer.tc |> elem(0)

time / 1000000
time = fn -> :pollution_gen_server.addStation(~c"test", {1, 2}) end |> :timer.tc |> elem(0)

time / 1000000
:sys.get_state(:pollution_gen_server)
# :pollution_gen_server.getStationMin(~c"Polska, Kraków, Floriana Straszewskiego", ~c"PM10")

time = fn -> :pollution_gen_server.getStationMin(~c"Polska, Kraków, Studencka", ~c"PM10") end |> :timer.tc |> elem(0)
time / 1_000_000
value = :pollution_gen_server.getStationMin(~c"9910 Polska, Kraków, Studencka", ~c"PM10")
time = fn -> :pollution_gen_server.getStationMin(~c"Polska, Kraków, Studencka", ~c"PM10") end |> :timer.tc |> elem(0)

IO.inspect(value, label: "Query Result")
IO.puts("Elapsed time: #{time} µs (#{time / 1_000_000} s)")
value = :pollution_gen_server.getDailyMean(~c"PM25", {2024, 2, 10})
time = fn -> :pollution_gen_server.getDailyMean(~c"PM25", {2024, 2, 10}) end |> :timer.tc |> elem(0)

IO.inspect(value, label: "Query Result")
IO.puts("Elapsed time: #{time} µs (#{time / 1_000_000} s)")
:pollution_gen_server.getDailyMean(~c"PM25", {2024, 2, 10})
Application.stop(:my_app)