Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Pipes

07-pipes.livemd

Pipes

Mix.install([
  {:kino, "~> 0.14.0"},
  {:kino_explorer, "~> 0.1.20"}
])

Pipes makes your life joyful 😃

Without pipe we can end up with defining lot of intermidiate variables just to avoid nested function calls

Since in elixir this is pretty common we have something better

defmodule Example do
  def process_numbers(numbers, threshold) do
    squared = Enum.map(numbers, fn x -> x * x end)
    filtered = Enum.filter(squared, fn x -> x <= threshold end)
    Enum.sum(filtered)
  end
end

Example.process_numbers([1, 2, 3, 4, 5], 10)

|> is a pipe

It takes an expression from left side and pass it as the first argument to the next function on right side.

Almost like a pipe which allows you to transport data between functions 🙌

defmodule BetterExample do
  def process_numbers(numbers, threshold) do
    numbers
    |> Enum.map(&amp;(&amp;1 * &amp;1))
    |> Enum.filter(&amp;(&amp;1 <= threshold))
    |> Enum.sum()
  end
end

BetterExample.process_numbers([1, 2, 3, 4, 5], 10)

Using with Enum and Streams

Enums

Enum is a special module you will run into every day, it contains lot of functions to manipulate data

odd? = fn x -> rem(x, 2) != 0 end
1..100_000 |> Enum.map(&amp;(&amp;1 * 3)) |> Enum.filter(odd?) |> Enum.sum()

Lets see whats actually going on here?

To inspect every step in a pipeline, we can simply use IO.inspect()

1..100_000
|> IO.inspect(label: "original")
|> Enum.map(&amp;(&amp;1 * 3))
|> IO.inspect(label: "multiplication")
|> Enum.filter(odd?)
|> IO.inspect(label: "is odd?")
|> Enum.sum()

Streams

Stream is also like Enum but allows you to lazy evaluate stuff.

In above example at each step we generated a sequence like a list, but we need to calculate everything at the last step…

Stream allows you to do that

1..100_000 |> Stream.map(&amp;(&amp;1 * 3)) |> Stream.filter(odd?) |> Enum.sum()

Stream is like a lazy Enum

This is very useful when you are reading a large file and processing data

Lets inspect whats actually going on

1..100_000
|> IO.inspect(label: "original")
|> Stream.map(&amp;(&amp;1 * 3))
|> IO.inspect(label: "multiplication")
|> Stream.filter(odd?)
|> IO.inspect(label: "is odd?")
|> Enum.sum()

So each call to Stream module creates a function which takes input data and wraps it in another function

When we invoke the Enum.sum at last step, all the functions we created earlier get executed

So we dont produce more data, its just bunch of functions running together

Streams in action with Pipe

Now we got a new project which has to process thousands of CSV files and calculate OT hours for each employee

our worksheet looks like this (more than 20K records)

employee_id,employee_name,hours_worked,date
7,George Clark,14,2023-02-13
2,Jane Smith,9,2023-03-03
6,Emily Davis,14,2023-11-19
11,Kevin Scott,9,2023-08-15
file_path =
  Kino.FS.file_path("Employee_WorkHours_Dataset.csv")

Using Kino we can visualize it in the Livebook

csv_dataset = file_path |> Explorer.DataFrame.from_csv!
defmodule OvertimeCalculator do
  @daily_ot_threshold 8
  @base_pay 10
  @ot_rate 0.5

  def calculate_ot(input_path) do
    input_path
    |> File.stream!()
    # Skip header row
    |> Stream.drop(1)
    # Parse each line
    |> Stream.map(&amp;parse_line/1)
    # Group shifts by employee ID
    |> Enum.group_by(&amp; &amp;1.employee_id)
    # Calculate daily overtime per employee
    |> Enum.map(&amp;calculate_daily_ot/1)
  end

  defp parse_line(line) do
    [employee_id, employee_name, hours_worked_str, date_str] = String.split(line, ",")
    hours_worked = String.to_integer(String.trim(hours_worked_str))
    date = Date.from_iso8601!(String.trim(date_str))

    %{
      employee_id: employee_id,
      employee_name: employee_name,
      hours_worked: hours_worked,
      date: date
    }
  end

  defp calculate_daily_ot({employee_id, shifts}) do
    # Calculate daily overtime for each shift
    daily_ot_data =
      shifts
      |> Enum.map(fn shift ->
        ot_hours = max(shift.hours_worked - @daily_ot_threshold, 0)
        ot_pay = ot_hours * @base_pay * @ot_rate

        %{
          date: shift.date,
          hours_worked: shift.hours_worked,
          ot_hours: ot_hours,
          ot_pay: ot_pay
        }
      end)

    # Sum up total hours, overtime hours, and overtime pay for the employee

    # We can do it like this 
    
    # total_hours = Enum.sum(for day <- daily_ot_data, do: day.hours_worked)
    # total_ot_hours = Enum.sum(for day <- daily_ot_data, do: day.ot_hours)
    # total_ot_pay = Enum.sum(for day <- daily_ot_data, do: day.ot_pay)

    # Or we can do it like this with reduce

    {total_hours, total_ot_hours, total_ot_pay} =
      Enum.reduce(daily_ot_data, {0, 0, 0}, fn day, {hours_acc, ot_hours_acc, ot_pay_acc} ->
        {
          hours_acc + day.hours_worked,
          ot_hours_acc + day.ot_hours,
          ot_pay_acc + day.ot_pay
        }
      end)

    # get emp name
    employee_name = shifts |> List.first() |> Map.get(:employee_name)

    %{
      employee_id: employee_id,
      employee_name: employee_name,
      total_hours: total_hours,
      total_ot_hours: total_ot_hours,
      total_ot_pay: total_ot_pay
    }
  end
end
overtime = file_path |> OvertimeCalculator.calculate_ot()

Lets visualize it with Kino

> Kino is a nice package which let you interact with data in Livebook

overtime |> Kino.DataTable.new