Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

PollutionDataLoader in Elixir

lab5/pollution_data_loader.livemd

PollutionDataLoader in Elixir

Section

Erlang dependencies

Code.append_path("/home/wiktor/university/sem4/erlang_elixir/lab4/mylib/_build/default/lib/mylib/ebin")

Import data

path = "/home/wiktor/university/sem4/erlang_elixir/lab5/AirlyData-ALL-50k.csv"
reads = File.read!(path) |> String.trim_trailing |> String.split("\n")
reads |> length

PollutionDataLoader module

defmodule PollutionDataLoader do
  @moduledoc """
  Module providing utility methods for loading and manipulationg data used later
  on by pollution server.
  """

  @doc """
  Parses single line of data from .csv file

  ## Example:

      iex> PollutionDataLoader.parse_line("2024-02-10T09:00:00.000Z;PM1;17.07;57570;Polska, Kraków, Floriana Straszewskiego;50.057224,19.933157")
      %{
        location: {50.057224, 19.933157},
        datetime: {{2024, 2, 10}, {9, 0, 0}},
        pollutionType: "PM1",
        pollutionLevel: 17.07,
        stationId: 57570,
        stationName: "Polska, Kraków, Floriana Straszewskiego"
      }
  """
  def parse_line(line) do
    line |> String.split(";")
    [datetime, pollutionType, pollutionLevel, stationId, stationName, location] = line |> String.split(";")
    location = location |>
               String.split(",") |>
               Enum.map(&String.to_float/1) |>
               List.to_tuple
    pollutionLevel = String.to_float(pollutionLevel)
    stationId = String.to_integer(stationId)
    {:ok, datetime, _offset} = DateTime.from_iso8601(datetime)
    datetime = DateTime.to_naive(datetime) |>
               NaiveDateTime.to_erl
    %{
      datetime: datetime,
      location: location,
      stationId: stationId,
      stationName: stationName,
      pollutionType: pollutionType,
      pollutionLevel: pollutionLevel
    }
  end

  @doc """
  Identifies unique stations from pollution data.

  ## Example:

      iex> PollutionDataLoader.identify_stations([
      ...>   %{
      ...>     location: {50.057224, 19.933157},
      ...>     datetime: {{2024, 2, 10}, {9, 0, 0}},
      ...>     pollutionType: "PM1",
      ...>     pollutionLevel: 17.07,
      ...>     stationId: 57570,
      ...>     stationName: "Polska, Kraków, Floriana Straszewskiego"
      ...>   },
      ...>   %{
      ...>     location: {50.057224, 19.933157},
      ...>     datetime: {{2024, 2, 10}, {9, 0, 0}},
      ...>     pollutionType: "PM25",
      ...>     pollutionLevel: 27.92,
      ...>     stationId: 57570,
      ...>     stationName: "Polska, Kraków, Floriana Straszewskiego"
      ...>   },
      ...>   %{
      ...>     location: {0.0, 0.0},
      ...>     datetime: {{2024, 2, 10}, {9, 0, 0}},
      ...>     pollutionType: "PM25",
      ...>     pollutionLevel: 27.92,
      ...>     stationId: 1,
      ...>     stationName: "Polska, Warszawa, Praga"
      ...>   }
      ...> ])
      [
        %{
          location: {50.057224, 19.933157},
          stationId: 57570,
          stationName: "Polska, Kraków, Floriana Straszewskiego"
        },
        %{
          location: {0.0, 0.0},
          stationId: 1,
          stationName: "Polska, Warszawa, Praga"
        }
      ]
  """
  def identify_stations(data_lines) when is_list(data_lines) do
    data_lines |>
    Enum.uniq_by(fn (data_map) -> data_map.stationName end) |>
    Enum.uniq_by(fn (data_map) -> data_map.location end) |>
    Enum.uniq_by(fn (data_map) -> data_map.stationId end) |>
    Enum.map(
      fn (data_map) ->
        %{location: data_map.location,
          stationName: data_map.stationName,
          stationId: data_map.stationId}
      end
      )
  end

  def identify_stations(_), do: {:error, "Ivalid arguments"}

  @doc """
  Adds values to monitor server
  """
  def add_values(data_lines) when is_list(data_lines) do
    data_lines |>
    Enum.each(
      fn (data_map) ->
        :pollution_gen_server.add_value(
          data_map.location,
          data_map.datetime,
          data_map.pollutionType,
          data_map.pollutionLevel
        )
      end
    )
  end
end

Parse Lines

data_lines = reads |> Enum.map(&PollutionDataLoader.parse_line/1)

Identyfie stations

stations_list = data_lines |> PollutionDataLoader.identify_stations

Number of unique stations

stations_list |> length

Integration with pollution_app

Application.start(:mylib)

0. Check whether integration works

:pollution_gen_server.get_monitor()

1. Create Stations

time_us =
fn () -> 
  stations_list |>
  Enum.each(
    fn (station_map_data) ->
      :pollution_gen_server.add_station(
        "#{station_map_data.stationName} #{station_map_data.stationId}",
        station_map_data.location
      )
    end
  ) 
end |>
:timer.tc |>
elem(0)

time_ms = time_us  / 1_000_000

2. Filter measurements from invalid stations

data_lines |> length
{:monitor, stations_map, _} = :pollution_gen_server.get_monitor()
data_lines = data_lines |> Enum.filter(&(Map.has_key?(stations_map, &1.location)))
data_lines |> length

3. Load data

pm10_measurements = data_lines |>
Enum.filter(&(&1.pollutionType == "PM10")) 
pm10_measurements |> length
time_us = :timer.tc(&PollutionDataLoader.add_values/1, [pm10_measurements]) |> elem(0)

time_s = time_us / 1_000_000
time_us = :timer.tc(&PollutionDataLoader.add_values/1, [data_lines]) |> elem(0)

time_s = time_us / 1_000_000

4. Analyze data

{time_us, result} = 
:timer.tc(&:pollution_gen_server.get_station_min/2, ["Polska, Kraków, Studencka 9910", "PM10"])
IO.puts("exec time: #{time_us / 1_000_000} [s]")
IO.puts("result: #{result}")
{time_us, result} = 
:timer.tc(&:pollution_gen_server.get_daily_mean/2, ["PM25", {2024, 2, 10}])
IO.puts("exec time: #{time_us / 1_000_000} [s]")
IO.puts("result: #{result}")