Pipes
Mix.install([
{:kino, "~> 0.14.0"},
{:kino_explorer, "~> 0.1.20"}
])
Pipes makes your life joyful 😃
Without pipe we can end up with defining lot of intermidiate variables just to avoid nested function calls
Since in elixir this is pretty common we have something better
defmodule Example do
def process_numbers(numbers, threshold) do
squared = Enum.map(numbers, fn x -> x * x end)
filtered = Enum.filter(squared, fn x -> x <= threshold end)
Enum.sum(filtered)
end
end
Example.process_numbers([1, 2, 3, 4, 5], 10)
|>
is a pipe
It takes an expression from left side and pass it as the first argument to the next function on right side.
Almost like a pipe which allows you to transport data between functions 🙌
defmodule BetterExample do
def process_numbers(numbers, threshold) do
numbers
|> Enum.map(&(&1 * &1))
|> Enum.filter(&(&1 <= threshold))
|> Enum.sum()
end
end
BetterExample.process_numbers([1, 2, 3, 4, 5], 10)
Using with Enum and Streams
Enums
Enum
is a special module you will run into every day, it contains lot of functions
to manipulate data
odd? = fn x -> rem(x, 2) != 0 end
1..100_000 |> Enum.map(&(&1 * 3)) |> Enum.filter(odd?) |> Enum.sum()
Lets see whats actually going on here?
To inspect every step in a pipeline, we can simply use IO.inspect()
1..100_000
|> IO.inspect(label: "original")
|> Enum.map(&(&1 * 3))
|> IO.inspect(label: "multiplication")
|> Enum.filter(odd?)
|> IO.inspect(label: "is odd?")
|> Enum.sum()
Streams
Stream
is also like Enum
but allows you to lazy evaluate stuff.
In above example at each step we generated a sequence like a list, but we need to calculate everything at the last step…
Stream
allows you to do that
1..100_000 |> Stream.map(&(&1 * 3)) |> Stream.filter(odd?) |> Enum.sum()
Stream
is like a lazy Enum
This is very useful when you are reading a large file and processing data
Lets inspect whats actually going on
1..100_000
|> IO.inspect(label: "original")
|> Stream.map(&(&1 * 3))
|> IO.inspect(label: "multiplication")
|> Stream.filter(odd?)
|> IO.inspect(label: "is odd?")
|> Enum.sum()
So each call to Stream
module creates a function which takes input data and wraps it in another function
When we invoke the Enum.sum
at last step, all the functions we created earlier get executed
So we dont produce more data, its just bunch of functions running together
Streams in action with Pipe
Now we got a new project which has to process thousands of CSV files and calculate OT hours for each employee
our worksheet
looks like this (more than 20K records)
employee_id,employee_name,hours_worked,date
7,George Clark,14,2023-02-13
2,Jane Smith,9,2023-03-03
6,Emily Davis,14,2023-11-19
11,Kevin Scott,9,2023-08-15
file_path =
Kino.FS.file_path("Employee_WorkHours_Dataset.csv")
Using Kino
we can visualize it in the Livebook
csv_dataset = file_path |> Explorer.DataFrame.from_csv!
defmodule OvertimeCalculator do
@daily_ot_threshold 8
@base_pay 10
@ot_rate 0.5
def calculate_ot(input_path) do
input_path
|> File.stream!()
# Skip header row
|> Stream.drop(1)
# Parse each line
|> Stream.map(&parse_line/1)
# Group shifts by employee ID
|> Enum.group_by(& &1.employee_id)
# Calculate daily overtime per employee
|> Enum.map(&calculate_daily_ot/1)
end
defp parse_line(line) do
[employee_id, employee_name, hours_worked_str, date_str] = String.split(line, ",")
hours_worked = String.to_integer(String.trim(hours_worked_str))
date = Date.from_iso8601!(String.trim(date_str))
%{
employee_id: employee_id,
employee_name: employee_name,
hours_worked: hours_worked,
date: date
}
end
defp calculate_daily_ot({employee_id, shifts}) do
# Calculate daily overtime for each shift
daily_ot_data =
shifts
|> Enum.map(fn shift ->
ot_hours = max(shift.hours_worked - @daily_ot_threshold, 0)
ot_pay = ot_hours * @base_pay * @ot_rate
%{
date: shift.date,
hours_worked: shift.hours_worked,
ot_hours: ot_hours,
ot_pay: ot_pay
}
end)
# Sum up total hours, overtime hours, and overtime pay for the employee
# We can do it like this
# total_hours = Enum.sum(for day <- daily_ot_data, do: day.hours_worked)
# total_ot_hours = Enum.sum(for day <- daily_ot_data, do: day.ot_hours)
# total_ot_pay = Enum.sum(for day <- daily_ot_data, do: day.ot_pay)
# Or we can do it like this with reduce
{total_hours, total_ot_hours, total_ot_pay} =
Enum.reduce(daily_ot_data, {0, 0, 0}, fn day, {hours_acc, ot_hours_acc, ot_pay_acc} ->
{
hours_acc + day.hours_worked,
ot_hours_acc + day.ot_hours,
ot_pay_acc + day.ot_pay
}
end)
# get emp name
employee_name = shifts |> List.first() |> Map.get(:employee_name)
%{
employee_id: employee_id,
employee_name: employee_name,
total_hours: total_hours,
total_ot_hours: total_ot_hours,
total_ot_pay: total_ot_pay
}
end
end
overtime = file_path |> OvertimeCalculator.calculate_ot()
Lets visualize it with Kino
> Kino
is a nice package which let you interact with data in Livebook
overtime |> Kino.DataTable.new