Tutorial
Your first pipeline
Mix.install([{:opus, "~> 0.8"}, {:kino, "~> 0.5"}])
Below you’ll see a simple pipeline with two steps. It takes a number, adds 1 then multiplies by 2.
When a module is made a pipeline with use Opus.Pipeline
it can be called with a call/1
function.
So our module below can be called with:
ArithmeticPipeline.call(number)
defmodule ArithmeticPipeline do
use Opus.Pipeline
step(:add_one, with: &(&1 + 1))
step(:multiply_by_two)
def multiply_by_two(n), do: n * 2
end
input = Kino.Input.number("number", default: 0)
number = Kino.Input.read(input)
ArithmeticPipeline.call(number)
Error Handling
So far we’ve only defined step
stages.
This stage processes the input value and with a success value the next stage is called with that value.
With an error value the pipeline is halted and an {:error, any}
is returned.
defmodule ArithmeticPipelineWithErrors do
use Opus.Pipeline
step(:add_one, with: &(&1 + 1))
step(:multiply_by_two)
step(:add_three, with: &(&1 + 3))
def multiply_by_two(n) when n < 42, do: n * 2
def multiply_by_two(_), do: {:error, "I only handle numbers < 42"}
end
Try this out with a number > 42 and you should see an Opus.PipelineError
.
As you can see when a step function returns an error tuple
like {:error, "I only handle numbers < 42"}
the pipeline
is halted an the next stages are not executed.
input2 = Kino.Input.number("input2", default: 43)
number = Kino.Input.read(input2)
ArithmeticPipelineWithErrors.call(number)
Validating Input
defmodule Validator do
use Opus.Pipeline
check(:valid_user, with: &match?(%{user: %{id: id}} when is_integer(id), &1))
check(:even_user, with: &(rem(&1.user.id, 2) == 0), error_message: "User should have an even id")
end
input3 = Kino.Input.number("input3", default: 3)
number = Kino.Input.read(input3)
Validator.call(%{user: %{id: number}})
The error message to return when a check
fails is configurable. It can be an atom, string or a function.
defmodule ValidatorWithError do
use Opus.Pipeline
check(:valid_user, with: &match?(%{user: %{id: id}} when is_integer(id), &1))
check(:even_user,
with: &(rem(&1.user.id, 2) == 0),
error_message: fn %{user: %{id: id}} ->
"Oh the user should have an even id, #{id} is not even"
end
)
end
ValidatorWithError.call(%{user: %{id: 5}})
Side-effects with the tee stage
You can use the tee
macro for side-effects. The return value in such stages is ignored.
defmodule ArithmeticSideEffectsPipeline do
use Opus.Pipeline
step(:add_one, with: &(&1 + 1))
tee(:print_number,
with: fn n ->
IO.puts("The given number is.. #{n}")
# The following error will be ignored
raise "error"
end
)
step(:multiply_by_two)
def multiply_by_two(n), do: n * 2
end
Notice how raising an error does not halt the pipeline with tee
.
input4 = Kino.Input.number("input4", default: 5)
number = Kino.Input.read(input4)
ArithmeticSideEffectsPipeline.call(number)
Linking Pipelines
Pipelines can call other pipelines and there’s the link
macro to make that easier.
link
is essentially a step
where the linked pipeline is called with the step function argument
and the step returns the return value of the linked pipeline.
input5 = Kino.Input.text("input4", default: "5")
input6 = Kino.Input.text("input4", default: "6")
defmodule ReadFirstInput do
use Opus.Pipeline
step(:read, with: &put_in(&1[:a], Kino.Input.read(&1[:input_a])))
end
defmodule ReadSecondInput do
use Opus.Pipeline
step(:read, with: &put_in(&1[:b], Kino.Input.read(&1[:input_b])))
end
defmodule Calculator do
use Opus.Pipeline
link(ReadFirstInput)
link(ReadSecondInput)
step(:parse)
step(:add, with: &(&1.a + &1.b), if: &match?(%{operation: :add}, &1))
step(:multiply, with: &(&1.a * &1.b), if: &match?(%{operation: :multiply}, &1))
def parse(%{a: a, b: b} = calculation) do
{a, _} = Integer.parse(a)
{b, _} = Integer.parse(b)
%{calculation | a: a, b: b}
end
end
Notice how we leverage the if
option to calculate based on the given operation.
Both if
and unless
can be used to make a stage optional.
Calculator.call(%{operation: :add, input_a: input5, input_b: input6})
Calculator.call(%{operation: :multiply, input_a: input5, input_b: input6})