Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

CSV JSON Data Exploring

csv_json_parse.livemd

CSV JSON Data Exploring

Mix.install([
  {:kino, "~> 0.11"},
  {:explorer, "~> 0.7"},
  {:kino_explorer, "~> 0.1"},
  {:kino_vega_lite, "~> 0.1"},
  {:nimble_csv, "~> 1.1"},
  {:jason, "~> 1.4"}
])

Section

defmodule Explore do
  alias NimbleCSV.RFC4180, as: CSV
  alias Explorer.DataFrame

  def to_date_string(nil) do
    nil
  end

  def to_date_string(d) do
    {:ok, dt, 0} = DateTime.from_iso8601(d)
    DateTime.to_iso8601(dt)
  end

  def merge_lists(list1, list2) do
    # Find the union of all keys across both lists
    all_keys =
      (list1 ++ list2)
      |> Enum.flat_map(&Map.keys(&1))
      |> Enum.uniq()

    # Ensure a map contains all keys, setting missing ones to nil
    ensure_all_keys = fn map, keys ->
      Enum.reduce(keys, %{}, fn key, acc ->
        Map.put(acc, key, Map.get(map, key, nil))
      end)
    end

    # Ensure each map in both lists contains all keys
    list1 = Enum.map(list1, &ensure_all_keys.(&1, all_keys))
    list2 = Enum.map(list2, &ensure_all_keys.(&1, all_keys))

    # Combine the two lists
    list1 ++ list2
  end

  def clean_map(%{} = map) do
    Enum.reduce(map, %{}, fn
      {_key, %{}}, acc -> acc
      {_key, nil}, acc -> acc
      {key, value}, acc when is_map(value) -> Map.put(acc, key, clean_map(value))
      {key, value}, acc -> Map.put(acc, key, value)
    end)
  end

  def do_thing() do
    ocpp_calls = "/Users/noah/Desktop/cassandra-ocpp-calls.csv"
    lm_logs = "/Users/noah/Desktop/be-lm-logs.csv"
    output = "/Users/noah/Desktop/correlated_lm_logs.csv"

    # Because Cassandra itself doesn't know about BE charger IDs
    iot_to_charger = %{
      "0ab0d60a-8851-41f0-ad5a-e19c8bdcd44b" => {"TACW1943022P4249", "5874"},
      "12232621-bc7d-44a5-bd66-1ad79ebd60fc" => {"TACW1944622P0099", "5882"},
      "eaf27aab-e506-48b0-9bcb-0ec7300fde88" => {"TACW1942522P6252", "5912"},
      "695e61fa-cf1b-4bb5-8ffb-3624c4096d21" => {"TACW1943022P4248", "5870"}
    }

    ocpp_calls
    |> File.stream!()
    |> CSV.parse_stream()
    # |> Stream.filter(fn line ->
    #   [_charger_uuid, _tsm, _timestamp, action, _conn_uid, direction, _error_c, _error_d, _error_det, _exact_timestamp, _match, _request_payload, _response_payload, _uid] = line

    #   action == "SetChargingProfile"
    # end)
    # |> Stream.take(1)
    |> Enum.map(fn line ->
      [
        charger_uuid,
        _tsm,
        timestamp,
        action,
        _conn_uid,
        direction,
        _error_c,
        _error_d,
        _error_det,
        _exact_timestamp,
        _match,
        request_payload,
        response_payload,
        _uid
      ] = line

      request_json =
        case Jason.decode(request_payload) do
          {:ok, json} -> json
          _ -> %{}
        end

      type = "DM_OCPP_LOG"
      {cid, request_json} = pop_in(request_json, ["connectorId"])
      {kind, request_json} = pop_in(request_json, ["csChargingProfiles", "chargingProfileKind"])

      {purpose, request_json} =
        pop_in(request_json, ["csChargingProfiles", "chargingProfilePurpose"])

      {unit, request_json} =
        pop_in(request_json, ["csChargingProfiles", "chargingSchedule", "chargingRateUnit"])

      {period, request_json} =
        pop_in(request_json, ["csChargingProfiles", "chargingSchedule", "chargingSchedulePeriod"])

      {start, request_json} =
        pop_in(request_json, ["csChargingProfiles", "chargingSchedule", "startSchedule"])

      # request_json = request_json |> clean_map()
      # response_json = request_json |> clean_map()

      {_serial, id} = iot_to_charger[charger_uuid]

      # [%{"limit" => limit, "startPeriod" => sp}] = period
      {limit, sp} =
        case period do
          %{"limit" => limit, "startPeriod" => sp} -> {limit, sp}
          %{"limit" => limit} -> {limit, nil}
          %{"startPeriod" => sp} -> {nil, sp}
          _ -> {nil, nil}
        end

      response_json =
        case Jason.decode(response_payload) do
          {:ok, json} -> json
          _ -> %{}
        end

      %{
        log_timestamp: to_date_string(timestamp),
        charger_id: id,
        type: type,
        action: action,
        direction: direction,
        connector: cid,
        purpose: purpose,
        kind: kind,
        limit: to_string(limit),
        unit: unit,
        start_period: sp,
        start_time: to_date_string(start),
        request_json: request_json |> Jason.encode!(),
        response_json: response_json |> Jason.encode!()
      }
    end)
    |> then(fn csv ->
      lm_logs
      |> File.stream!()
      |> CSV.parse_stream()
      # |> Stream.take(5)
      |> Enum.map(fn line ->
        type = "BE_LM_LOG"

        [
          inserted_at,
          _rule_id,
          _type,
          odl,
          dl,
          tl,
          tuid,
          id,
          _serial_number,
          _name,
          _enabled,
          start,
          _stop,
          _repeat_start,
          _repeat_stop,
          _duration_type,
          _trigger_timestamp
        ] = line

        [dt, ms] = String.split(inserted_at, ".")
        ms = String.slice(ms, 0, 3)
        dt = "#{dt}.#{ms}+0000"
        {:ok, d, 0} = DateTime.from_iso8601(dt)

        %{
          log_timestamp: DateTime.to_iso8601(d),
          charger_id: id,
          type: type,
          old_default_limit: odl,
          default_limit: dl,
          trans_id: tuid,
          trans_limit: tl,
          start_time: start
        }
      end)
      |> then(fn new_csv -> merge_lists(new_csv, csv) end)
    end)
    |> then(fn csv ->
      Enum.sort_by(csv, fn line -> line.log_timestamp end, :desc)
    end)
    |> then(fn csv ->
      headers = [
        :log_timestamp,
        :type,
        :action,
        :direction,
        :charger_id,
        :connector,
        :purpose,
        :kind,
        :limit,
        :unit,
        :old_default_limit,
        :default_limit,
        :trans_id,
        :trans_limit,
        :start_time,
        :request_json,
        :response_json
      ]

      values =
        Enum.map(csv, fn row ->
          Enum.map(headers, fn header ->
            Map.get(row, header)
          end)
        end)

      headers = Enum.map(headers, &Atom.to_string/1)
      File.write!(output, CSV.dump_to_iodata([headers | values]))

      DataFrame.new(csv)
      |> DataFrame.select(headers)
    end)
  end
end

Explore.do_thing()