Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Basic DataFrame Manipulation

1_Basic_DataFrame_Manipulation.livemd

Basic DataFrame Manipulation

Mix.install(
  [
    {:kino_explorer, "~> 0.1.20"},
    {:explorer, "~> 0.10.0"},
    {:httpoison, "~> 1.8"},
    {:jason, "~> 1.4"},
    {:vega_lite, "~> 0.1.7"},
    {:exla, ">= 0.0.0"},
    {:kino_vega_lite, "~> 0.1.11"},
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Section

Basic DataFrame Manipulation

Let’s dive into a simple data manipulation task. In Python, you might double the values in a column with df["feature"] = df["feature"] * 2. Here’s how we achieve the same in Elixir:

alias Explorer.DataFrame
alias Explorer.Series

# Ensure all macros and functions in Explorer.DataFrame are available
require DataFrame

# Sample data frame creation
data = %{
  feature: [1, 2, 3, 4, 5]
}

# Creating the DataFrame
df = DataFrame.new(data)

# Accessing the "feature" column and doubling its values
# feature_series = DataFrame.pull(df, "feature")
# doubled_feature = Series.multiply(feature_series, 2)

# Simpler style in elixir, but equivalent to the above
doubled_feature = DataFrame.pull(df, "feature") 
  |> Series.multiply(2)


# Creating a new DataFrame with the updated values
df = DataFrame.put(df, "feature", doubled_feature)

df

Advanced Data Processing

Handling missing values is a common data science task. i.e. s.interpolate(method='linear') in python. Let’s perform linear interpolation, a bit more involved in Elixir:

defmodule DataFrameProcessor do
  require Logger
  alias Explorer.DataFrame
  alias Explorer.Series

  # Define a function to interpolate values in a Series
  def interpolate(series) do
    values = Series.to_list(series)
    Logger.info("Original series: #{inspect(values)}")

    interpolated = Enum.reduce_while(values, [], fn value, acc ->
      Logger.info("Current value: #{inspect(value)}, Accumulated: #{inspect(acc)}")

      case value do
        nil ->
          last_value = List.last(acc) || 0.0
          index = length(acc)

          Logger.info("Interpolating. Last known: #{inspect(last_value)}, Index: #{inspect(index)}")

          rest = Enum.drop(values, index + 1)
          next_value = Enum.find(rest, fn x -> not is_nil(x) end)

          interpolated_value =
            if not is_nil(next_value) do
              (last_value + next_value) / 2
            else
              last_value
            end

          Logger.info("Interpolated value: #{inspect(interpolated_value)}")

          {:cont, acc ++ [interpolated_value]}

        _ ->
          {:cont, acc ++ [value]}
      end
    end)

    Logger.info("Interpolated series: #{inspect(interpolated)}")
    Series.from_list(interpolated)
  end

  def process_and_join(date_list, feature1, feature2) do
    data_df1 = %{
      "feature1" => feature1,
      "date" => date_list
    }

    data_df2 = %{
      "feature2" => feature2,
      "date" => date_list
    }

    df1 = DataFrame.new(data_df1)
    df2 = DataFrame.new(data_df2)

    Logger.info("Initial DataFrame df1: #{inspect(df1)}")
    Logger.info("Initial DataFrame df2: #{inspect(df2)}")

    df1 = DataFrame.put(df1, "feature1", interpolate(DataFrame.pull(df1, "feature1")))
    df2 = DataFrame.put(df2, "feature2", interpolate(DataFrame.pull(df2, "feature2")))

    Logger.info("DataFrame df1 after interpolation: #{inspect(df1)}")
    Logger.info("DataFrame df2 after interpolation: #{inspect(df2)}")

    joined_df = DataFrame.join(df1, df2, on: "date", how: :left)

    Logger.info("Joined DataFrame: #{inspect(joined_df)}")
    joined_df
  end
end

# Sample data for testing
date_list = [
  ~D[2023-01-01],
  ~D[2023-01-02],
  ~D[2023-01-03],
  ~D[2023-01-04],
  ~D[2023-01-05]
]

feature1 = [1.0, nil, 3.0, nil, 5.0]
feature2 = [nil, 8.0, nil, 16.0, 20.0]

# Execute the processor with sample data
DataFrameProcessor.process_and_join(date_list, feature1, feature2)

Calculating Moving Averages Moving averages offer insights into data trends.

In python/pandas, again one liner, df['feature'].rolling(9).mean().

Thankfully, there’s some support in Series in Elixir. Check this link. Here’s how to calculate them in Elixir:

defmodule TemperatureAnalysis do
  alias Explorer.DataFrame
  alias Explorer.Series

  def moving_average(date_list, temperature_list, window_size, weights \\ []) do
    # Create a dataframe with date and temperature
    data = %{
      "date" => date_list,
      "temperature" => temperature_list
    }
    df = DataFrame.new(data)

    # Pull the temperature series and calculate the moving average with the provided weights
    temperature_series = DataFrame.pull(df, "temperature")
    
    # Calculate the moving average using the provided weights
    # If the weights are not provided, equal weighting is assumed
    moving_avg_series =
      Series.window_mean(temperature_series, window_size, weights: weights, min_periods: 1)
    
    # Add the moving_average column back to the dataframe
    df_with_moving_avg = DataFrame.put(df, "moving_average", moving_avg_series)

    # Log or return the DataFrame with the moving average
    IO.inspect(df_with_moving_avg, label: "DataFrame with Moving Average")
    df_with_moving_avg
  end
end

defmodule DateGenerator do
  # Function to generate a list of dates from a start date to a specified number of days
  def generate_dates(start_date, days) do
    Enum.map(0..(days-1), fn offset -> Date.add(start_date, offset) end)
  end
end

# Sample Input
date_list = DateGenerator.generate_dates(~D[2023-01-01], 21)

# Generate dynamic temperature values using a sinusoidal function with noise
temperature_list = Enum.map(1..21, fn day ->
  10.0 * :math.sin(day / 2) + Enum.random(-2..2) + 20
end)

window_size = 3

# Execute the moving average calculation
temperature = TemperatureAnalysis.moving_average(date_list, temperature_list, window_size, nil)

Visualization is apparently week compared to python. But it provides the basics.

df.plot()

VegaLite.new(width: 1080, title: "case study")
|> VegaLite.data_from_values(temperature, only: ["date", "moving_average", "temperature"])
|> VegaLite.layers([
  VegaLite.new()
  |> VegaLite.mark(:line)
  |> VegaLite.encode_field(:x, "date", type: :temporal)
  |> VegaLite.encode_field(:y, "moving_average", type: :quantitative),
  VegaLite.new()
  |> VegaLite.mark(:point)
  |> VegaLite.encode_field(:x, "date", type: :temporal)
  |> VegaLite.encode_field(:y, "temperature", type: :quantitative)
])

Let’s dive deeper! Downlaod sample uisng below and attach to livebook.

curl -L -o ~/Downloads/240000-household-electricity-consumption-records.zip\
  https://www.kaggle.com/api/v1/datasets/download/thedevastator/240000-household-electricity-consumption-records

Or If you like to have Sensor data, Use below

#!/bin/bash
curl -L -o ~/Downloads/solar-power-generation-data.zip\
  https://www.kaggle.com/api/v1/datasets/download/anikannal/solar-power-generation-data
csv_path = "/data/files/household_power_consumption.csv"
# Specify the `dtypes` options for numerical columns
# Specify the options for loading the CSV
options = [
  dtypes: %{
    "Global_active_power" => :float,
    "Global_reactive_power" => :float,
    "Voltage" => :float,
    "Global_intensity" => :float,
    "Sub_metering_1" => :float,
    "Sub_metering_2" => :float,
    "Sub_metering_3" => :float
  },
  nil_values: ["?"]  # Use the correct option `nil_values` instead of `null_values`
]

# Load the CSV into a DataFrame with specified options
df = DataFrame.from_csv!(csv_path, options)

Let use dataset below, because I want to expore datetime.

csv_path = "/data/files/Plant_1_Weather_Sensor_Data.csv"

# Load the CSV into a DataFrame with specified options
df = DataFrame.from_csv!(csv_path)

Datetime handling

alias Explorer.DataFrame, as: DF
DF.dtypes(df)

DATE_TIME column is recognized as :string, so let’s handle this better!

DF.pull(df, "DATE_TIME") |> Series.first() |> DateTime.from_iso8601(:basic)

It seems the format is not Elixir friendly. So need to handle this smart.

defmodule DateTimeParser do
  @moduledoc """
  A module for parsing datetime strings into Elixir DateTime structs.
  """

  @doc """
  Parses a datetime string in the format "YYYY-MM-DD HH:MM:SS" into a DateTime struct.

  ## Examples

      iex> DateTimeParser.parse_to_datetime("2020-05-15 00:00:00")
      {:ok, ~U[2020-05-15 00:00:00Z]}

      iex> DateTimeParser.parse_to_datetime("invalid-string")
      {:error, :invalid_format}

  """
  def parse_to_datetime(datetime_string) when is_binary(datetime_string) do
    datetime_string
    |> String.replace(" ", "T")
    |> Kernel.<>("Z")
    |> DateTime.from_iso8601()
    |> handle_parsing_result()
  end

  @doc """
  Parses a datetime string in the format "YYYY-MM-DD HH:MM:SS" into a DateTime struct.
  Raises an error if parsing fails.
  """
  def parse_to_datetime!(datetime_string) when is_binary(datetime_string) do
    case parse_to_datetime(datetime_string) do
      {:ok, datetime} -> datetime
      {:error, _reason} -> raise "Invalid datetime format!"
    end
  end

  defp handle_parsing_result({:ok, datetime, _offset}) do
    {:ok, datetime}
  end
  defp handle_parsing_result({:error, reason}) do
    {:error, reason}
  end
  
end

# Usage example
datetime_string = "1993-05-17 00:00:00"
case DateTimeParser.parse_to_datetime(datetime_string) do
  {:ok, datetime} -> datetime
  {:error, reason} -> IO.puts("Failed to parse: #{reason}")
end
DF.pull(df, "DATE_TIME") |> Series.first() |>DateTimeParser.parse_to_datetime!
# Assuming `df` is your dataframe and `DF.pull/2` provides an `Explorer.Series`
date_time_series = DF.pull(df, "DATE_TIME")

# Convert the series to a list, map the transformation, and convert back to series
datetime_list = date_time_series
|> Explorer.Series.to_list()  # Convert to list
|> Enum.map(&amp;DateTimeParser.parse_to_datetime!/1)  # Apply transformation

# Optionally, convert the list back to an Explorer.Series
new_date_time_series = Explorer.Series.from_list(datetime_list)

Above works, but below is more beautiful. Data friendly way :)

DF.pull(df, "DATE_TIME") |> Series.transform(&amp;DateTimeParser.parse_to_datetime!/1)
DataFrame.transform(df, [names: ["DATE_TIME"]], 
  &amp;DateTimeParser.parse_to_datetime!(&amp;1)
)

Above does not work because the format is not exactly matching. So let’s convert syntax.

df2 = DataFrame.transform(df, [names: ["DATE_TIME"]], 
  fn row ->
    value = row["DATE_TIME"] |> DateTimeParser.parse_to_datetime!
    %{"DATE_TIME" => value}
  end
)
DF.dtypes(df2)

Awesome! it uses datetime format :) However, it does not overwrite.

Explorer.DataFrame.mutate(df, new_column: transform(col("DATE_TIME"), &amp;DateTimeParser.parse_to_datetime!/1))

As you notice from the error message above, let’s use put

df2 = DataFrame.put(df, "DATE_TIME", Series.transform(df["DATE_TIME"], &amp;DateTimeParser.parse_to_datetime!/1))

Simply convert into :date

Series.cast(df2["DATE_TIME"], :date)
df2 = DF.mutate_with(df2, &amp;[DATE: Series.cast(&amp;1["DATE_TIME"], :date)])
# or below works, too
# df2 = DataFrame.put(df2, "DATE", Series.cast(df2["DATE_TIME"], :date))

week can be introduced simply!

DF.mutate(df2, &amp;[week: day_of_week(&amp;1["DATE"])])

Aggregation

df2
|> DF.group_by("DATE")
|> DF.summarise_with(&amp;%{
    "AMBIENT_TEMPERATURE_MEAN" => Explorer.Series.mean(&amp;1["AMBIENT_TEMPERATURE"])
})
df2
|> DF.group_by("DATE")
|> DF.summarise_with(&amp;[
    AMBIENT_TEMPERATURE_MEAN: Explorer.Series.mean(&amp;1["AMBIENT_TEMPERATURE"])
])