PollutionDataLoader in Elixir
Section
Erlang dependencies
Code.append_path("/home/wiktor/university/sem4/erlang_elixir/lab4/mylib/_build/default/lib/mylib/ebin")
Import data
path = "/home/wiktor/university/sem4/erlang_elixir/lab5/AirlyData-ALL-50k.csv"
reads = File.read!(path) |> String.trim_trailing |> String.split("\n")
reads |> length
PollutionDataLoader module
defmodule PollutionDataLoader do
@moduledoc """
Module providing utility methods for loading and manipulationg data used later
on by pollution server.
"""
@doc """
Parses single line of data from .csv file
## Example:
iex> PollutionDataLoader.parse_line("2024-02-10T09:00:00.000Z;PM1;17.07;57570;Polska, Kraków, Floriana Straszewskiego;50.057224,19.933157")
%{
location: {50.057224, 19.933157},
datetime: {{2024, 2, 10}, {9, 0, 0}},
pollutionType: "PM1",
pollutionLevel: 17.07,
stationId: 57570,
stationName: "Polska, Kraków, Floriana Straszewskiego"
}
"""
def parse_line(line) do
line |> String.split(";")
[datetime, pollutionType, pollutionLevel, stationId, stationName, location] = line |> String.split(";")
location = location |>
String.split(",") |>
Enum.map(&String.to_float/1) |>
List.to_tuple
pollutionLevel = String.to_float(pollutionLevel)
stationId = String.to_integer(stationId)
{:ok, datetime, _offset} = DateTime.from_iso8601(datetime)
datetime = DateTime.to_naive(datetime) |>
NaiveDateTime.to_erl
%{
datetime: datetime,
location: location,
stationId: stationId,
stationName: stationName,
pollutionType: pollutionType,
pollutionLevel: pollutionLevel
}
end
@doc """
Identifies unique stations from pollution data.
## Example:
iex> PollutionDataLoader.identify_stations([
...> %{
...> location: {50.057224, 19.933157},
...> datetime: {{2024, 2, 10}, {9, 0, 0}},
...> pollutionType: "PM1",
...> pollutionLevel: 17.07,
...> stationId: 57570,
...> stationName: "Polska, Kraków, Floriana Straszewskiego"
...> },
...> %{
...> location: {50.057224, 19.933157},
...> datetime: {{2024, 2, 10}, {9, 0, 0}},
...> pollutionType: "PM25",
...> pollutionLevel: 27.92,
...> stationId: 57570,
...> stationName: "Polska, Kraków, Floriana Straszewskiego"
...> },
...> %{
...> location: {0.0, 0.0},
...> datetime: {{2024, 2, 10}, {9, 0, 0}},
...> pollutionType: "PM25",
...> pollutionLevel: 27.92,
...> stationId: 1,
...> stationName: "Polska, Warszawa, Praga"
...> }
...> ])
[
%{
location: {50.057224, 19.933157},
stationId: 57570,
stationName: "Polska, Kraków, Floriana Straszewskiego"
},
%{
location: {0.0, 0.0},
stationId: 1,
stationName: "Polska, Warszawa, Praga"
}
]
"""
def identify_stations(data_lines) when is_list(data_lines) do
data_lines |>
Enum.uniq_by(fn (data_map) -> data_map.stationName end) |>
Enum.uniq_by(fn (data_map) -> data_map.location end) |>
Enum.uniq_by(fn (data_map) -> data_map.stationId end) |>
Enum.map(
fn (data_map) ->
%{location: data_map.location,
stationName: data_map.stationName,
stationId: data_map.stationId}
end
)
end
def identify_stations(_), do: {:error, "Ivalid arguments"}
@doc """
Adds values to monitor server
"""
def add_values(data_lines) when is_list(data_lines) do
data_lines |>
Enum.each(
fn (data_map) ->
:pollution_gen_server.add_value(
data_map.location,
data_map.datetime,
data_map.pollutionType,
data_map.pollutionLevel
)
end
)
end
end
Parse Lines
data_lines = reads |> Enum.map(&PollutionDataLoader.parse_line/1)
Identyfie stations
stations_list = data_lines |> PollutionDataLoader.identify_stations
Number of unique stations
stations_list |> length
Integration with pollution_app
Application.start(:mylib)
0. Check whether integration works
:pollution_gen_server.get_monitor()
1. Create Stations
time_us =
fn () ->
stations_list |>
Enum.each(
fn (station_map_data) ->
:pollution_gen_server.add_station(
"#{station_map_data.stationName} #{station_map_data.stationId}",
station_map_data.location
)
end
)
end |>
:timer.tc |>
elem(0)
time_ms = time_us / 1_000_000
2. Filter measurements from invalid stations
data_lines |> length
{:monitor, stations_map, _} = :pollution_gen_server.get_monitor()
data_lines = data_lines |> Enum.filter(&(Map.has_key?(stations_map, &1.location)))
data_lines |> length
3. Load data
pm10_measurements = data_lines |>
Enum.filter(&(&1.pollutionType == "PM10"))
pm10_measurements |> length
time_us = :timer.tc(&PollutionDataLoader.add_values/1, [pm10_measurements]) |> elem(0)
time_s = time_us / 1_000_000
time_us = :timer.tc(&PollutionDataLoader.add_values/1, [data_lines]) |> elem(0)
time_s = time_us / 1_000_000
4. Analyze data
{time_us, result} =
:timer.tc(&:pollution_gen_server.get_station_min/2, ["Polska, Kraków, Studencka 9910", "PM10"])
IO.puts("exec time: #{time_us / 1_000_000} [s]")
IO.puts("result: #{result}")
{time_us, result} =
:timer.tc(&:pollution_gen_server.get_daily_mean/2, ["PM25", {2024, 2, 10}])
IO.puts("exec time: #{time_us / 1_000_000} [s]")
IO.puts("result: #{result}")