CSV JSON Data Exploring
Mix.install([
{:kino, "~> 0.11"},
{:explorer, "~> 0.7"},
{:kino_explorer, "~> 0.1"},
{:kino_vega_lite, "~> 0.1"},
{:nimble_csv, "~> 1.1"},
{:jason, "~> 1.4"}
])
Section
defmodule Explore do
alias NimbleCSV.RFC4180, as: CSV
alias Explorer.DataFrame
def to_date_string(nil) do
nil
end
def to_date_string(d) do
{:ok, dt, 0} = DateTime.from_iso8601(d)
DateTime.to_iso8601(dt)
end
def merge_lists(list1, list2) do
# Find the union of all keys across both lists
all_keys =
(list1 ++ list2)
|> Enum.flat_map(&Map.keys(&1))
|> Enum.uniq()
# Ensure a map contains all keys, setting missing ones to nil
ensure_all_keys = fn map, keys ->
Enum.reduce(keys, %{}, fn key, acc ->
Map.put(acc, key, Map.get(map, key, nil))
end)
end
# Ensure each map in both lists contains all keys
list1 = Enum.map(list1, &ensure_all_keys.(&1, all_keys))
list2 = Enum.map(list2, &ensure_all_keys.(&1, all_keys))
# Combine the two lists
list1 ++ list2
end
def clean_map(%{} = map) do
Enum.reduce(map, %{}, fn
{_key, %{}}, acc -> acc
{_key, nil}, acc -> acc
{key, value}, acc when is_map(value) -> Map.put(acc, key, clean_map(value))
{key, value}, acc -> Map.put(acc, key, value)
end)
end
def do_thing() do
ocpp_calls = "/Users/noah/Desktop/cassandra-ocpp-calls.csv"
lm_logs = "/Users/noah/Desktop/be-lm-logs.csv"
output = "/Users/noah/Desktop/correlated_lm_logs.csv"
# Because Cassandra itself doesn't know about BE charger IDs
iot_to_charger = %{
"0ab0d60a-8851-41f0-ad5a-e19c8bdcd44b" => {"TACW1943022P4249", "5874"},
"12232621-bc7d-44a5-bd66-1ad79ebd60fc" => {"TACW1944622P0099", "5882"},
"eaf27aab-e506-48b0-9bcb-0ec7300fde88" => {"TACW1942522P6252", "5912"},
"695e61fa-cf1b-4bb5-8ffb-3624c4096d21" => {"TACW1943022P4248", "5870"}
}
ocpp_calls
|> File.stream!()
|> CSV.parse_stream()
# |> Stream.filter(fn line ->
# [_charger_uuid, _tsm, _timestamp, action, _conn_uid, direction, _error_c, _error_d, _error_det, _exact_timestamp, _match, _request_payload, _response_payload, _uid] = line
# action == "SetChargingProfile"
# end)
# |> Stream.take(1)
|> Enum.map(fn line ->
[
charger_uuid,
_tsm,
timestamp,
action,
_conn_uid,
direction,
_error_c,
_error_d,
_error_det,
_exact_timestamp,
_match,
request_payload,
response_payload,
_uid
] = line
request_json =
case Jason.decode(request_payload) do
{:ok, json} -> json
_ -> %{}
end
type = "DM_OCPP_LOG"
{cid, request_json} = pop_in(request_json, ["connectorId"])
{kind, request_json} = pop_in(request_json, ["csChargingProfiles", "chargingProfileKind"])
{purpose, request_json} =
pop_in(request_json, ["csChargingProfiles", "chargingProfilePurpose"])
{unit, request_json} =
pop_in(request_json, ["csChargingProfiles", "chargingSchedule", "chargingRateUnit"])
{period, request_json} =
pop_in(request_json, ["csChargingProfiles", "chargingSchedule", "chargingSchedulePeriod"])
{start, request_json} =
pop_in(request_json, ["csChargingProfiles", "chargingSchedule", "startSchedule"])
# request_json = request_json |> clean_map()
# response_json = request_json |> clean_map()
{_serial, id} = iot_to_charger[charger_uuid]
# [%{"limit" => limit, "startPeriod" => sp}] = period
{limit, sp} =
case period do
%{"limit" => limit, "startPeriod" => sp} -> {limit, sp}
%{"limit" => limit} -> {limit, nil}
%{"startPeriod" => sp} -> {nil, sp}
_ -> {nil, nil}
end
response_json =
case Jason.decode(response_payload) do
{:ok, json} -> json
_ -> %{}
end
%{
log_timestamp: to_date_string(timestamp),
charger_id: id,
type: type,
action: action,
direction: direction,
connector: cid,
purpose: purpose,
kind: kind,
limit: to_string(limit),
unit: unit,
start_period: sp,
start_time: to_date_string(start),
request_json: request_json |> Jason.encode!(),
response_json: response_json |> Jason.encode!()
}
end)
|> then(fn csv ->
lm_logs
|> File.stream!()
|> CSV.parse_stream()
# |> Stream.take(5)
|> Enum.map(fn line ->
type = "BE_LM_LOG"
[
inserted_at,
_rule_id,
_type,
odl,
dl,
tl,
tuid,
id,
_serial_number,
_name,
_enabled,
start,
_stop,
_repeat_start,
_repeat_stop,
_duration_type,
_trigger_timestamp
] = line
[dt, ms] = String.split(inserted_at, ".")
ms = String.slice(ms, 0, 3)
dt = "#{dt}.#{ms}+0000"
{:ok, d, 0} = DateTime.from_iso8601(dt)
%{
log_timestamp: DateTime.to_iso8601(d),
charger_id: id,
type: type,
old_default_limit: odl,
default_limit: dl,
trans_id: tuid,
trans_limit: tl,
start_time: start
}
end)
|> then(fn new_csv -> merge_lists(new_csv, csv) end)
end)
|> then(fn csv ->
Enum.sort_by(csv, fn line -> line.log_timestamp end, :desc)
end)
|> then(fn csv ->
headers = [
:log_timestamp,
:type,
:action,
:direction,
:charger_id,
:connector,
:purpose,
:kind,
:limit,
:unit,
:old_default_limit,
:default_limit,
:trans_id,
:trans_limit,
:start_time,
:request_json,
:response_json
]
values =
Enum.map(csv, fn row ->
Enum.map(headers, fn header ->
Map.get(row, header)
end)
end)
headers = Enum.map(headers, &Atom.to_string/1)
File.write!(output, CSV.dump_to_iodata([headers | values]))
DataFrame.new(csv)
|> DataFrame.select(headers)
end)
end
end
Explore.do_thing()