Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Preparing Financial Time Series for ML in Elixir

financial_data_preparation.livemd

Preparing Financial Time Series for ML in Elixir

Mix.install([
      {:explorer, "~> 0.9.2"},
      {:vega_lite, "~> 0.1.10"},
      {:jason, "~> 1.4.4"},
      {:kino_vega_lite, "~> 0.1.13"}
])

Our Helper Function to Parse Durations:

A few declerations needed for this Livebook

alias Explorer.DataFrame, as: DF
alias Explorer.Series
alias VegaLite, as: Vl

require Explorer.DataFrame # Needed so we can use lazy functions like DF.mutate_with
require Explorer.Series # Needed so we can use lazy functions like Series.mutate_with

This is a script to convert “druation text” to seconds/milliseconds. All credit goes to AlloyCI. Script was made by them and can be found at: https://github.com/AlloyCI/alloy_ci/blob/89a6c4e8b93b58ca5c0c52e67ada6c9ebf4f894c/lib/alloy_ci/lib/time_convert.ex

defmodule ConvertDuration do
  @moduledoc """
  Module to convert seconds to compound time, or cron duration notation
  to seconds
  """
  @second 1
  @minute 60
  @hour @minute * 60
  @day @hour * 24
  @week @day * 7
  @divisor [@week, @day, @hour, @minute, 1]

  @spec to_compound(pos_integer()) :: binary()
  def to_compound(sec) do
    {_, [s, m, h, d, w]} =
      Enum.reduce(@divisor, {sec, []}, fn divisor, {n, acc} ->
        {rem(n, divisor), [div(n, divisor) | acc]}
      end)

    ["#{w} wk", "#{d} d", "#{h} hr", "#{m} min", "#{s} sec"]
    |> Enum.reject(fn str -> String.starts_with?(str, "0") end)
    |> Enum.join(", ")
  end

  @spec to_seconds(binary()) :: pos_integer()
  def to_seconds(cron_string) do
    cron_string
    |> cleanup()
    |> calculate_from_words()
  end

  @spec to_milliseconds(binary()) :: pos_integer()
  def to_milliseconds(cron_string) do
    cron_string
    |> to_seconds()
    |> then(& &1 * 1000)

  end

  ###################
  # Private functions
  ###################
  defp cleanup(string) do
    string
    |> String.downcase()
    |> String.replace(number_matcher(), " \\0 ")
    |> String.trim(" ")
    |> filter_through_white_list()
  end

  defp calculate_from_words(string) do
    string
    |> Enum.with_index()
    |> Enum.reduce(0, fn {value, index}, acc ->
      if Regex.match?(number_matcher(), value) do
        acc +
          String.to_integer(value) *
            (string |> Enum.at(index + 1) |> duration_units_seconds_multiplier())
      else
        acc
      end
    end)
  end

  defp duration_units_seconds_multiplier(unit) do
    case unit do
      "years" -> 31_557_600
      "months" -> @day * 30
      "weeks" -> @week
      "days" -> @day
      "hours" -> @hour
      "minutes" -> @minute
      "seconds" -> @second
      _ -> 0
    end
  end

  defp filter_through_white_list(string) do
    string
    |> String.split(" ")
    |> Enum.map(fn sub ->
      if Regex.match?(number_matcher(), sub) do
        String.trim(sub)
      else
        if mappings()[sub] in ~w(seconds minutes hours days weeks months years) do
          String.trim(mappings()[sub])
        end
      end
    end)
    |> Enum.reject(&is_nil/1)
  end

  defp number_matcher do
    Regex.compile!("[0-9]*\\.?[0-9]+")
  end

  defp mappings do
    %{
      "seconds" => "seconds",
      "second" => "seconds",
      "secs" => "seconds",
      "sec" => "seconds",
      "s" => "seconds",
      "minutes" => "minutes",
      "minute" => "minutes",
      "mins" => "minutes",
      "min" => "minutes",
      "m" => "minutes",
      "hours" => "hours",
      "hour" => "hours",
      "hrs" => "hours",
      "hr" => "hours",
      "h" => "hours",
      "days" => "days",
      "day" => "days",
      "dy" => "days",
      "d" => "days",
      "weeks" => "weeks",
      "week" => "weeks",
      "wks" => "weeks",
      "wk" => "weeks",
      "w" => "weeks",
      "months" => "months",
      "mo" => "months",
      "mos" => "months",
      "month" => "months",
      "years" => "years",
      "year" => "years",
      "yrs" => "years",
      "yr" => "years",
      "y" => "years"
    }
  end
end

Time Bars

defmodule TimeBars do
  
  @input_csv_file "c:/Users/USERNAME/files/BTCUSDT-aggTrades-2024-09.csv"

  def get_time_bars(timeframe \\ "1 day") do
    # Getting the timeframe in milliseconds. Can also just use an int directly.
    timeframe_in_ms = ConvertDuration.to_milliseconds(timeframe)

    # Loading the file in Lazy mode
    df = DF.from_csv!(@input_csv_file, [lazy: true])
      # Renaming some columns for simplicity. is_buyer_maker tells us the direction: if True, seller is the "taker" which makes this trade a sell.
      |> DF.rename(transact_time: "timestamp", quantity: "volume", is_buyer_maker: "is_sell")
      # Selecting only the columns we care about
      |> DF.select(["timestamp", "price", "volume", "is_sell"])
      # Adding some extra data: a column for dollar_value = price * volume
      |> DF.mutate_with(&[dollar_value: Series.multiply(&1[:price], &1[:volume])])
      # Assigning bar numbers to each row
      |> DF.mutate_with(fn df ->
        # Getting the first timestamp for first bar
        first_time_stamp = df["timestamp"] |> Series.first
        # Creating a series for time elapsed_time_since_first_timestamp (current_trade_timestamp - first_timestamp)
        elapsed_ms = df["timestamp"] |> Series.subtract(first_time_stamp)
        # Dividing our new series by the duration we want and casting back to int - which will remove any decimal point values
        bar_numbers = elapsed_ms |> Series.divide(timeframe_in_ms) |> Series.cast({:s, 64})
        # And then returning the bars, which will add them as a series to out Dataframe
        [bar_number: bar_numbers]
      end)
      # Converting out Dataframe from Lazy to Eager - calculating our code so far
      |> DF.collect()

    # First timestamp of the dataframe - so we can use it later
    first_time_stamp = df["timestamp"] |> Series.first

    # Grouping our data frame by the bar numbers column we created
    df |> DF.group_by(:bar_number)
      # Summarizing the results to create a new DF with our bars - each group will be a new row in our new DF
      |> DF.summarise(
          # Time of the bar. We calculate it according to the timeframe and first_time_stamp.
          # In this case, bar_number series all rows are the same so first/last/min/max all would work.
          date_time: ^timeframe_in_ms * Series.first(bar_number) + ^first_time_stamp,
          open: Series.first(price), # First price in the group - the open price
          high: Series.max(price), # Highest price in the group - the high price
          low: Series.min(price), # Lowest price in the group - the low price
          close: Series.last(price), # Last price in the group - the close price
          volume: Series.sum(volume), # First price in the group - the open price
          buy_volume: Series.sum(Series.select(is_sell, 0, volume)), # Volume of only the market buy orders
          num_of_trades: Series.count(price), # First price in the group - the open price
          dollar_value: Series.sum(dollar_value) # First price in the group - the open price
      )
      # Getting rid of the bar_number column as it is really our row number now
      |> DF.discard("bar_number")

  end

end

time_bars = TimeBars.get_time_bars()

Visualize Bars with VegaLite

defmodule VlHelper do
  
  # This function expects time column to be called "date_time" and price column to be called "price".
  def get_bars_plot(df, title \\ "", height \\ 600, width \\ 700) do

    # Initiating the VegaLite instance with width and height
    Vl.new(width: width, height: height)
      |> Vl.data_from_values(DF.to_rows(df)) # Converting the DF to rows
      # Encoding our x/time axis. :temporal tells VL this data contains time units
      |> Vl.encode_field(:x, "date_time", title: title, type: :temporal, format: "%m/%d", labelAngle: -45)
      # Encoding the y/price axis. :quantitative tells VL this data is quantitative in nature. scale: [zero: false] tells VL we don't want to start this axis from 0 (our prices are in the ~ 60,000)
      |> Vl.encode(:y, field: "close", title: "Price", type: :quantitative, scale: [zero: false])
      # Encoding the colors for our bars, if price closes above the open price it's a green bar, otherwise red
      |> Vl.encode(:color, condition: [test: "datum.open < datum.close", value: "green"], value: "red")
      # Adding the bars layers
      |> Vl.layers([
        # The bodies (:bar type)
        Vl.new()
        |> Vl.mark(:bar)
        |> Vl.encode_field(:y, "open")
        |> Vl.encode_field(:y2, "close"),
        # The wicks (:rule type)
        Vl.new()
        |> Vl.mark(:rule)
        |> Vl.encode_field(:y, "low")
        |> Vl.encode_field(:y2, "high"),
      ])
  end
end

And visualize the result

VlHelper.get_bars_plot(time_bars)

Tick Bars, Volume Bars and Dollar Bars

We can find the threshold using the following function:

defmodule Utils do
  def get_threshold(df, column, divisor \\ 50) do
    mean = df[column]
      |> Series.mean()

    mean / divisor
  end
end

And our function to generate the bars:

defmodule Bars do
    @input_csv_file "c:/Users/USERNAME/files/BTCUSDT-aggTrades-2024-09.csv"
    @previous_month_daily_csv_file "c:/Users/USERNAME/files/BTCUSDT-1d-2024-08.csv"
    def get_standard_bars_reduce(bar_type) do
    # Load previous month df to get a threshold number
    previous_month_df = DF.from_csv!(@previous_month_daily_csv_file)
    # Just chaning type "dollar" to "dollar_value" to keep in line with previous examples, while keeping "bar_type" options short
    bar_type = if bar_type == "dollar", do: "dollar_value", else: bar_type

    # Getting the threshold according to bar_type and raising an error if bar_type is not what we expect
    threshold = case bar_type do
      "tick" -> Utils.get_threshold(previous_month_df, "count")
      "volume" -> Utils.get_threshold(previous_month_df, "volume")
      "dollar_value" -> Utils.get_threshold(previous_month_df, "quote_volume")
      _ -> raise("Bar type can only be: tick, volume or dollar")
    end


    df = DF.from_csv!(@input_csv_file, [lazy: true])
      |> DF.rename(transact_time: "timestamp", quantity: "volume", is_buyer_maker: "is_sell")
      |> DF.select(["timestamp", "price", "volume", "is_sell"])
      |> DF.mutate_with(&amp;[
          dollar_value: Series.multiply(&amp;1[:price], &amp;1[:volume]),
          # Adding a 'dummy' column with only Ones (1) to count trades/ticks
          tick: 1.0])
      |> DF.collect()
      # Until here everything is more or less the same

    # Let's calculate the bars. We start with the column holding the data we need.
    bars = df[bar_type]
      # Converting to list so we can enumerate over it
      |> Series.to_list()
      # Using reduce we keep tracking of the current_bar (starting with 1), cum_value (starting with 0) and our bar_number array (starting empty: [])
      |> Enum.reduce({1, 0, []}, fn value, {current_bar, cum_value, bars_array} ->
        # For each row we add the current bar to our bars array - adding to front to optimize speed
        new_bars_array = [ current_bar | bars_array ]
        # Updating cumulative_value
        new_cum_value = cum_value + value

        # If new_cum_value is greater than our threshold we increment our bar number and reset the cum_values counter
        if (new_cum_value >= threshold) do
          {current_bar + 1, 0, new_bars_array}
        # Otherwise we return the updated new_cum_value and new_bars_array without resetting or incrementing
        else
          {current_bar, new_cum_value, new_bars_array}
        end
      end)
      # Then we choose the 3rd element of our tuple (the bars_array)
      |> elem(2)
      # And reversing the array since we were adding new values to the front (it's much faster than pushing to the back in Elixir)
      |> Enum.reverse

    df
      # Now we can create a new series from our bars_array and add it to our DF
      |> DF.put(:bar_number, Series.from_list(bars))
      # And summarise as before
      |> DF.group_by(:bar_number)
      # Summarizing the results to create a new DF with our bars - each group will be a new row in our new DF
      |> DF.summarise(
          date_time: Series.first(timestamp), # Time of the first trade in the bar.
          open: Series.first(price), # First price in the group - the open price
          high: Series.max(price), # Highest price in the group - the high price
          low: Series.min(price), # Lowest price in the group - the low price
          close: Series.last(price), # Last price in the group - the close price
          volume: Series.sum(volume), # Volume as units of the currecy
          buy_volume: Series.sum(Series.select(is_sell, 0, volume)), # Volume of only the market buy orders
          dollar_value: Series.sum(dollar_value), # Volume in Dollar value
          num_of_trades: Series.count(price) # Number of trades in the bar
      )
      # Getting rid of the bar_number column as it is really our row number now
      |> DF.discard("bar_number")
  end


end

Now we can create our new bars dataframes:

tick_bars = Bars.get_standard_bars_reduce("tick")
volume_bars = Bars.get_standard_bars_reduce("volume")
dollar_bars = Bars.get_standard_bars_reduce("dollar")

We can now visualize the first 100 bars of each of them:

VlHelper.get_bars_plot(tick_bars |> DF.head(100), "Tick Bars")
VlHelper.get_bars_plot(volume_bars |> DF.head(100), "Volume Bars")
VlHelper.get_bars_plot(dollar_bars |> DF.head(100), "Dollar Bars")

Plotting The Number of Bars Over Time

We can visualize the number of bars each method produced over the span of this month. First we create the new Vl Helper Function:

defmodule VlBarCount do
  
  defp truncate_timestamp(series) do
    Series.cast(series, {:naive_datetime, :millisecond}) |> Series.transform(&amp;NaiveDateTime.beginning_of_day/1)
  end
  
  # Expecting series_array to contain 3 timestamp series in this order: tick, volume, dollar
  def get_bar_count_plot(series_array, height \\ 600, width \\ 700) do

    # Apparently Enum cannot enumerate over a list of series so we do it manually...
    # Truncating - aligning timestamps to beginning of day so we can group by day
    tick_bars_series = series_array |> elem(0) |> truncate_timestamp
    volume_bars_series = series_array |> elem(1) |> truncate_timestamp
    dollar_bars_series = series_array |> elem(2) |> truncate_timestamp

    # Creating a DF from each series with another column to identify the series before we merge them
    tick_bars_df = DF.new([timestamp: tick_bars_series]) |> DF.mutate([series: "Tick Bars"])
    volume_bars_df = DF.new([timestamp: volume_bars_series]) |> DF.mutate([series: "Volume Bars"])
    dollar_bars_df = DF.new([timestamp: dollar_bars_series]) |> DF.mutate([series: "Dollar Bars"])

    # Merging our 3 DFs
    df = DF.concat_rows([tick_bars_df, volume_bars_df, dollar_bars_df])
      # Grouping by timestamp (day in our case), and series - so each series gets its own row for each day
      |> DF.group_by(["timestamp", "series"])
      # Counting the number of rows for each group / day in our case
      |> DF.summarise(count: count(series))

    # Creating new VegaLite Plot
    Vl.new(width: width, height: height)
      |> Vl.data_from_values(DF.to_rows(df))
      |> Vl.mark(:line) # Telling Vl we want a line plot
      |> Vl.encode_field(:x, "timestamp", type: :temporal) # timestamp on the x axis is a temporal type - represents datetime values
      |> Vl.encode_field(:y, "count", type: :quantitative) # count is our data we want to plot on the y axis
      |> Vl.encode_field(:color, "series", type: :nominal) # differentiate the series column by color
      |> Vl.encode_field(:stroke_dash, "series", type: :nominal) # and then by stroke_dash size
  end


end

And now we just need to run the function with our 3 bar types dataframes:

VlBarCount.get_bar_count_plot({tick_bars["date_time"], volume_bars["date_time"], dollar_bars["date_time"]})