Preparing Financial Time Series for ML in Elixir
Mix.install([
{:explorer, "~> 0.9.2"},
{:vega_lite, "~> 0.1.10"},
{:jason, "~> 1.4.4"},
{:kino_vega_lite, "~> 0.1.13"}
])
Our Helper Function to Parse Durations:
A few declerations needed for this Livebook
alias Explorer.DataFrame, as: DF
alias Explorer.Series
alias VegaLite, as: Vl
require Explorer.DataFrame # Needed so we can use lazy functions like DF.mutate_with
require Explorer.Series # Needed so we can use lazy functions like Series.mutate_with
This is a script to convert “druation text” to seconds/milliseconds. All credit goes to AlloyCI. Script was made by them and can be found at: https://github.com/AlloyCI/alloy_ci/blob/89a6c4e8b93b58ca5c0c52e67ada6c9ebf4f894c/lib/alloy_ci/lib/time_convert.ex
defmodule ConvertDuration do
@moduledoc """
Module to convert seconds to compound time, or cron duration notation
to seconds
"""
@second 1
@minute 60
@hour @minute * 60
@day @hour * 24
@week @day * 7
@divisor [@week, @day, @hour, @minute, 1]
@spec to_compound(pos_integer()) :: binary()
def to_compound(sec) do
{_, [s, m, h, d, w]} =
Enum.reduce(@divisor, {sec, []}, fn divisor, {n, acc} ->
{rem(n, divisor), [div(n, divisor) | acc]}
end)
["#{w} wk", "#{d} d", "#{h} hr", "#{m} min", "#{s} sec"]
|> Enum.reject(fn str -> String.starts_with?(str, "0") end)
|> Enum.join(", ")
end
@spec to_seconds(binary()) :: pos_integer()
def to_seconds(cron_string) do
cron_string
|> cleanup()
|> calculate_from_words()
end
@spec to_milliseconds(binary()) :: pos_integer()
def to_milliseconds(cron_string) do
cron_string
|> to_seconds()
|> then(& &1 * 1000)
end
###################
# Private functions
###################
defp cleanup(string) do
string
|> String.downcase()
|> String.replace(number_matcher(), " \\0 ")
|> String.trim(" ")
|> filter_through_white_list()
end
defp calculate_from_words(string) do
string
|> Enum.with_index()
|> Enum.reduce(0, fn {value, index}, acc ->
if Regex.match?(number_matcher(), value) do
acc +
String.to_integer(value) *
(string |> Enum.at(index + 1) |> duration_units_seconds_multiplier())
else
acc
end
end)
end
defp duration_units_seconds_multiplier(unit) do
case unit do
"years" -> 31_557_600
"months" -> @day * 30
"weeks" -> @week
"days" -> @day
"hours" -> @hour
"minutes" -> @minute
"seconds" -> @second
_ -> 0
end
end
defp filter_through_white_list(string) do
string
|> String.split(" ")
|> Enum.map(fn sub ->
if Regex.match?(number_matcher(), sub) do
String.trim(sub)
else
if mappings()[sub] in ~w(seconds minutes hours days weeks months years) do
String.trim(mappings()[sub])
end
end
end)
|> Enum.reject(&is_nil/1)
end
defp number_matcher do
Regex.compile!("[0-9]*\\.?[0-9]+")
end
defp mappings do
%{
"seconds" => "seconds",
"second" => "seconds",
"secs" => "seconds",
"sec" => "seconds",
"s" => "seconds",
"minutes" => "minutes",
"minute" => "minutes",
"mins" => "minutes",
"min" => "minutes",
"m" => "minutes",
"hours" => "hours",
"hour" => "hours",
"hrs" => "hours",
"hr" => "hours",
"h" => "hours",
"days" => "days",
"day" => "days",
"dy" => "days",
"d" => "days",
"weeks" => "weeks",
"week" => "weeks",
"wks" => "weeks",
"wk" => "weeks",
"w" => "weeks",
"months" => "months",
"mo" => "months",
"mos" => "months",
"month" => "months",
"years" => "years",
"year" => "years",
"yrs" => "years",
"yr" => "years",
"y" => "years"
}
end
end
Time Bars
defmodule TimeBars do
@input_csv_file "c:/Users/USERNAME/files/BTCUSDT-aggTrades-2024-09.csv"
def get_time_bars(timeframe \\ "1 day") do
# Getting the timeframe in milliseconds. Can also just use an int directly.
timeframe_in_ms = ConvertDuration.to_milliseconds(timeframe)
# Loading the file in Lazy mode
df = DF.from_csv!(@input_csv_file, [lazy: true])
# Renaming some columns for simplicity. is_buyer_maker tells us the direction: if True, seller is the "taker" which makes this trade a sell.
|> DF.rename(transact_time: "timestamp", quantity: "volume", is_buyer_maker: "is_sell")
# Selecting only the columns we care about
|> DF.select(["timestamp", "price", "volume", "is_sell"])
# Adding some extra data: a column for dollar_value = price * volume
|> DF.mutate_with(&[dollar_value: Series.multiply(&1[:price], &1[:volume])])
# Assigning bar numbers to each row
|> DF.mutate_with(fn df ->
# Getting the first timestamp for first bar
first_time_stamp = df["timestamp"] |> Series.first
# Creating a series for time elapsed_time_since_first_timestamp (current_trade_timestamp - first_timestamp)
elapsed_ms = df["timestamp"] |> Series.subtract(first_time_stamp)
# Dividing our new series by the duration we want and casting back to int - which will remove any decimal point values
bar_numbers = elapsed_ms |> Series.divide(timeframe_in_ms) |> Series.cast({:s, 64})
# And then returning the bars, which will add them as a series to out Dataframe
[bar_number: bar_numbers]
end)
# Converting out Dataframe from Lazy to Eager - calculating our code so far
|> DF.collect()
# First timestamp of the dataframe - so we can use it later
first_time_stamp = df["timestamp"] |> Series.first
# Grouping our data frame by the bar numbers column we created
df |> DF.group_by(:bar_number)
# Summarizing the results to create a new DF with our bars - each group will be a new row in our new DF
|> DF.summarise(
# Time of the bar. We calculate it according to the timeframe and first_time_stamp.
# In this case, bar_number series all rows are the same so first/last/min/max all would work.
date_time: ^timeframe_in_ms * Series.first(bar_number) + ^first_time_stamp,
open: Series.first(price), # First price in the group - the open price
high: Series.max(price), # Highest price in the group - the high price
low: Series.min(price), # Lowest price in the group - the low price
close: Series.last(price), # Last price in the group - the close price
volume: Series.sum(volume), # First price in the group - the open price
buy_volume: Series.sum(Series.select(is_sell, 0, volume)), # Volume of only the market buy orders
num_of_trades: Series.count(price), # First price in the group - the open price
dollar_value: Series.sum(dollar_value) # First price in the group - the open price
)
# Getting rid of the bar_number column as it is really our row number now
|> DF.discard("bar_number")
end
end
time_bars = TimeBars.get_time_bars()
Visualize Bars with VegaLite
defmodule VlHelper do
# This function expects time column to be called "date_time" and price column to be called "price".
def get_bars_plot(df, title \\ "", height \\ 600, width \\ 700) do
# Initiating the VegaLite instance with width and height
Vl.new(width: width, height: height)
|> Vl.data_from_values(DF.to_rows(df)) # Converting the DF to rows
# Encoding our x/time axis. :temporal tells VL this data contains time units
|> Vl.encode_field(:x, "date_time", title: title, type: :temporal, format: "%m/%d", labelAngle: -45)
# Encoding the y/price axis. :quantitative tells VL this data is quantitative in nature. scale: [zero: false] tells VL we don't want to start this axis from 0 (our prices are in the ~ 60,000)
|> Vl.encode(:y, field: "close", title: "Price", type: :quantitative, scale: [zero: false])
# Encoding the colors for our bars, if price closes above the open price it's a green bar, otherwise red
|> Vl.encode(:color, condition: [test: "datum.open < datum.close", value: "green"], value: "red")
# Adding the bars layers
|> Vl.layers([
# The bodies (:bar type)
Vl.new()
|> Vl.mark(:bar)
|> Vl.encode_field(:y, "open")
|> Vl.encode_field(:y2, "close"),
# The wicks (:rule type)
Vl.new()
|> Vl.mark(:rule)
|> Vl.encode_field(:y, "low")
|> Vl.encode_field(:y2, "high"),
])
end
end
And visualize the result
VlHelper.get_bars_plot(time_bars)
Tick Bars, Volume Bars and Dollar Bars
We can find the threshold using the following function:
defmodule Utils do
def get_threshold(df, column, divisor \\ 50) do
mean = df[column]
|> Series.mean()
mean / divisor
end
end
And our function to generate the bars:
defmodule Bars do
@input_csv_file "c:/Users/USERNAME/files/BTCUSDT-aggTrades-2024-09.csv"
@previous_month_daily_csv_file "c:/Users/USERNAME/files/BTCUSDT-1d-2024-08.csv"
def get_standard_bars_reduce(bar_type) do
# Load previous month df to get a threshold number
previous_month_df = DF.from_csv!(@previous_month_daily_csv_file)
# Just chaning type "dollar" to "dollar_value" to keep in line with previous examples, while keeping "bar_type" options short
bar_type = if bar_type == "dollar", do: "dollar_value", else: bar_type
# Getting the threshold according to bar_type and raising an error if bar_type is not what we expect
threshold = case bar_type do
"tick" -> Utils.get_threshold(previous_month_df, "count")
"volume" -> Utils.get_threshold(previous_month_df, "volume")
"dollar_value" -> Utils.get_threshold(previous_month_df, "quote_volume")
_ -> raise("Bar type can only be: tick, volume or dollar")
end
df = DF.from_csv!(@input_csv_file, [lazy: true])
|> DF.rename(transact_time: "timestamp", quantity: "volume", is_buyer_maker: "is_sell")
|> DF.select(["timestamp", "price", "volume", "is_sell"])
|> DF.mutate_with(&[
dollar_value: Series.multiply(&1[:price], &1[:volume]),
# Adding a 'dummy' column with only Ones (1) to count trades/ticks
tick: 1.0])
|> DF.collect()
# Until here everything is more or less the same
# Let's calculate the bars. We start with the column holding the data we need.
bars = df[bar_type]
# Converting to list so we can enumerate over it
|> Series.to_list()
# Using reduce we keep tracking of the current_bar (starting with 1), cum_value (starting with 0) and our bar_number array (starting empty: [])
|> Enum.reduce({1, 0, []}, fn value, {current_bar, cum_value, bars_array} ->
# For each row we add the current bar to our bars array - adding to front to optimize speed
new_bars_array = [ current_bar | bars_array ]
# Updating cumulative_value
new_cum_value = cum_value + value
# If new_cum_value is greater than our threshold we increment our bar number and reset the cum_values counter
if (new_cum_value >= threshold) do
{current_bar + 1, 0, new_bars_array}
# Otherwise we return the updated new_cum_value and new_bars_array without resetting or incrementing
else
{current_bar, new_cum_value, new_bars_array}
end
end)
# Then we choose the 3rd element of our tuple (the bars_array)
|> elem(2)
# And reversing the array since we were adding new values to the front (it's much faster than pushing to the back in Elixir)
|> Enum.reverse
df
# Now we can create a new series from our bars_array and add it to our DF
|> DF.put(:bar_number, Series.from_list(bars))
# And summarise as before
|> DF.group_by(:bar_number)
# Summarizing the results to create a new DF with our bars - each group will be a new row in our new DF
|> DF.summarise(
date_time: Series.first(timestamp), # Time of the first trade in the bar.
open: Series.first(price), # First price in the group - the open price
high: Series.max(price), # Highest price in the group - the high price
low: Series.min(price), # Lowest price in the group - the low price
close: Series.last(price), # Last price in the group - the close price
volume: Series.sum(volume), # Volume as units of the currecy
buy_volume: Series.sum(Series.select(is_sell, 0, volume)), # Volume of only the market buy orders
dollar_value: Series.sum(dollar_value), # Volume in Dollar value
num_of_trades: Series.count(price) # Number of trades in the bar
)
# Getting rid of the bar_number column as it is really our row number now
|> DF.discard("bar_number")
end
end
Now we can create our new bars dataframes:
tick_bars = Bars.get_standard_bars_reduce("tick")
volume_bars = Bars.get_standard_bars_reduce("volume")
dollar_bars = Bars.get_standard_bars_reduce("dollar")
We can now visualize the first 100 bars of each of them:
VlHelper.get_bars_plot(tick_bars |> DF.head(100), "Tick Bars")
VlHelper.get_bars_plot(volume_bars |> DF.head(100), "Volume Bars")
VlHelper.get_bars_plot(dollar_bars |> DF.head(100), "Dollar Bars")
Plotting The Number of Bars Over Time
We can visualize the number of bars each method produced over the span of this month. First we create the new Vl Helper Function:
defmodule VlBarCount do
defp truncate_timestamp(series) do
Series.cast(series, {:naive_datetime, :millisecond}) |> Series.transform(&NaiveDateTime.beginning_of_day/1)
end
# Expecting series_array to contain 3 timestamp series in this order: tick, volume, dollar
def get_bar_count_plot(series_array, height \\ 600, width \\ 700) do
# Apparently Enum cannot enumerate over a list of series so we do it manually...
# Truncating - aligning timestamps to beginning of day so we can group by day
tick_bars_series = series_array |> elem(0) |> truncate_timestamp
volume_bars_series = series_array |> elem(1) |> truncate_timestamp
dollar_bars_series = series_array |> elem(2) |> truncate_timestamp
# Creating a DF from each series with another column to identify the series before we merge them
tick_bars_df = DF.new([timestamp: tick_bars_series]) |> DF.mutate([series: "Tick Bars"])
volume_bars_df = DF.new([timestamp: volume_bars_series]) |> DF.mutate([series: "Volume Bars"])
dollar_bars_df = DF.new([timestamp: dollar_bars_series]) |> DF.mutate([series: "Dollar Bars"])
# Merging our 3 DFs
df = DF.concat_rows([tick_bars_df, volume_bars_df, dollar_bars_df])
# Grouping by timestamp (day in our case), and series - so each series gets its own row for each day
|> DF.group_by(["timestamp", "series"])
# Counting the number of rows for each group / day in our case
|> DF.summarise(count: count(series))
# Creating new VegaLite Plot
Vl.new(width: width, height: height)
|> Vl.data_from_values(DF.to_rows(df))
|> Vl.mark(:line) # Telling Vl we want a line plot
|> Vl.encode_field(:x, "timestamp", type: :temporal) # timestamp on the x axis is a temporal type - represents datetime values
|> Vl.encode_field(:y, "count", type: :quantitative) # count is our data we want to plot on the y axis
|> Vl.encode_field(:color, "series", type: :nominal) # differentiate the series column by color
|> Vl.encode_field(:stroke_dash, "series", type: :nominal) # and then by stroke_dash size
end
end
And now we just need to run the function with our 3 bar types dataframes:
VlBarCount.get_bar_count_plot({tick_bars["date_time"], volume_bars["date_time"], dollar_bars["date_time"]})