Twitch data analysis
Mix.install([
{:explorer, "~> 0.8.0"},
{:kino, "~> 0.12.2"},
{:kino_vega_lite, "~> 0.1.10"}
])
Libraries
defmodule TwitchExport do
@moduledoc false
require Explorer.DataFrame, as: DataFrame
def list(file) do
file
|> :zip.list_dir()
|> then(fn {:ok, files} -> files end)
|> Enum.map(fn
{:zip_comment, []} -> nil
{:zip_file, path, _options, _comment, _offset, _comp_size} -> path
end)
|> Enum.reject(&is_nil/1)
end
def load(file) do
file
|> DataFrame.from_csv!(
columns: ["time", "channel", "minutes_logged", "game"],
dtypes: [{"time", {:naive_datetime, :microsecond}}]
)
end
end
defmodule TwitchData do
require Explorer.DataFrame, as: DataFrame
alias Explorer.Series
def as_string(s), do: s |> Series.cast(:string) |> Series.to_list()
def nominal_date_column(df) do
{df["year"], df["month"]}
|> then(fn {y, m} -> Enum.zip(as_string(y), as_string(m)) end)
|> Enum.map(fn {y, m} -> "#{y}-#{String.pad_leading(m, 2, "0")}" end)
|> Series.from_list()
|> then(fn s -> DataFrame.put(df, "date", s) end)
end
def remove_unwatched_channels(df, threshold \\ 60) do
df
|> DataFrame.group_by([:channel])
|> DataFrame.filter(count(minutes_logged) > ^threshold)
|> DataFrame.ungroup()
end
def preprocess(df, threshold: threshold) do
df
|> DataFrame.mutate(year: year(time), month: month(time), weekday: day_of_week(time))
|> DataFrame.mutate(hour: hour(time))
|> remove_unwatched_channels(threshold)
end
# Filters
def channel(df, channel), do: DataFrame.filter(df, channel == ^channel)
def years(df, start, stop), do: DataFrame.filter(df, ^start <= year and year <= ^stop)
# Groups
def group_channel(df), do: group(df, [:channel])
def group_month(df), do: group(df, [:month, :year]) |> nominal_date_column()
def group_channel_month(df), do: group(df, [:channel, :month, :year]) |> nominal_date_column()
def group_week(df), do: group(df, [:weekday])
defp group(df, columns) do
df
|> DataFrame.group_by(columns)
|> DataFrame.summarise_with(
&[
total: Series.count(&1["minutes_logged"]) |> Series.divide(60) |> Series.cast(:integer),
channels: Series.n_distinct(&1["channel"])
]
)
|> DataFrame.sort_by(desc: total)
end
end
df =
"minute_watched.csv"
|> TwitchExport.load()
|> TwitchData.preprocess(threshold: 60 * 6)
|> TwitchData.years(2019, 2023)
Global Statistics
total = TwitchData.group_month(df)
VegaLite.new(title: "Total")
|> VegaLite.data_from_values(total, only: ["date", "total"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "date", type: :nominal)
|> VegaLite.encode_field(:y, "total", type: :quantitative)
Channels breakdown
channels = TwitchData.group_channel(df)
VegaLite.new(width: 800, title: "Channel Breakdown")
|> VegaLite.data_from_values(channels, only: ["channel", "total"])
|> VegaLite.mark(:bar)
|> VegaLite.encode_field(:x, "channel", type: :nominal)
|> VegaLite.encode_field(:y, "total", type: :quantitative)
channel_month = TwitchData.group_channel_month(df)
VegaLite.new()
|> VegaLite.data_from_values(channel_month, only: ["date", "channel", "total"])
|> VegaLite.mark(:bar)
|> VegaLite.encode_field(:x, "date", type: :nominal)
|> VegaLite.encode_field(:y, "channel", type: :nominal)
|> VegaLite.encode_field(:color, "total", type: :quantitative)
Channel breakdown
import Kino.Shorts
name = read_text("Channel name")
if name == "" do
Kino.interrupt!(:error, "Fill in the channel name")
end
channel = df |> TwitchData.channel(name) |> TwitchData.group_month()
VegaLite.new(width: 800, title: "Channel #{name}")
|> VegaLite.data_from_values(channel, only: ["date", "total"])
|> VegaLite.mark(:bar)
|> VegaLite.encode_field(:x, "date", type: :nominal)
|> VegaLite.encode_field(:y, "total", type: :quantitative)