Reactor Practice
Mix.install(
[
{:instructor, "~> 0.1"},
{:ash_swarm, github: "seanchatmangpt/ash_swarm", branch: "main"},
{:ash, "~> 3.0"},
{:reactor, "~> 0.13.3"},
{:igniter, "~> 0.5"}
],
config: [
instructor: [
api_url: "https://api.groq.com/openai",
api_key: System.get_env("LB_GROQ_API_KEY"), # Replace with your actual API key
]
],
consolidate_protocols: false
)
Application.put_env(:ash, :validate_domain_resource_inclusion?, false)
Application.put_env(:ash, :validate_domain_config_inclusion?, false)
# Configure the Groq adapter explicitly as the default adapter
Application.put_env(:instructor, :adapter, Instructor.Adapters.Groq)
# Add the Groq adapter-specific configuration
Application.put_env(:instructor, :groq, [
api_url: "https://api.groq.com/openai",
api_key: System.get_env("LB_GROQ_API_KEY"), # Replace with your actual API key
http_options: [receive_timeout: 60_000]
])
Logger.configure_backend(:console, format: "[$level] $message\n", level: :debug)
Section
defmodule Greeter do
@moduledoc false
use Reactor.Step
def run(%{whom: nil}, _, _), do: {:ok, "Hello, World!"}
def run(%{whom: whom}, _, _), do: {:ok, "Hello, #{whom}!"}
end
reactor = Reactor.Builder.new()
|> Reactor.Builder.add_input!(:whom)
|> Reactor.Builder.add_step!(:greeter, Greeter, whom: {:input, :whom})
|> Reactor.Builder.return!(:greeter)
Reactor.run!(reactor, %{whom: "Sean"})
defmodule GreeterReactor do
use Reactor
input :whom
step :greeter, Greeter do
argument :whom, input(:whom)
end
return :greeter
end
Reactor.run!(GreeterReactor, %{whom: "Sean"})
# Dummy Middleware โ simply passes the result along.
defmodule MyApp.LoggingMiddleware do
@behaviour Reactor.Middleware
@impl true
def call(result, _step, _context), do: result
end
# Dummy step to register a user.
defmodule MyApp.RegisterUserStep do
use Reactor.Step
@impl true
def run(%{email: email, password: _password} = args, _context, _opts) do
IO.puts("Registering user: #{email}")
{:ok, Map.put(args, :id, 1)}
end
end
# Dummy step to create a Stripe customer.
defmodule MyApp.CreateStripeCustomerStep do
use Reactor.Step
@impl true
def run(%{email: email}, _context, _opts) do
IO.puts("Creating Stripe customer for: #{email}")
{:ok, %{id: 101, email: email}}
end
end
# Dummy step to find a Stripe plan.
defmodule MyApp.FindStripePlanStep do
use Reactor.Step
@impl true
def run(%{plan_name: plan_name}, _context, _opts) do
IO.puts("Finding Stripe plan: #{plan_name}")
{:ok, %{id: 201, name: plan_name}}
end
end
# Dummy step to create a Stripe subscription.
defmodule MyApp.CreateStripeSubscriptionStep do
use Reactor.Step
@impl true
def run(%{customer_id: customer_id, plan_id: plan_id}, _context, _opts) do
IO.puts("Creating Stripe subscription for customer #{customer_id} on plan #{plan_id}")
{:ok, %{id: 301, customer_id: customer_id, plan_id: plan_id}}
end
end
# A Reactor to create a subscription (composed into our main reactor).
defmodule MyApp.CreateStripeSubscriptionReactor do
use Reactor
input :customer_id
input :plan_id
step :create_subscription, MyApp.CreateStripeSubscriptionStep do
argument :customer_id, input(:customer_id)
argument :plan_id, input(:plan_id)
end
return :create_subscription
end
# Dummy step to send a welcome email.
defmodule MyApp.SendWelcomeEmailStep do
use Reactor.Step
@impl true
def run(%{email: email, subscription_id: subscription_id}, _context, _opts) do
IO.puts("Sending welcome email to #{email}")
{:ok, %{status: "email sent", subscription_id: subscription_id}}
end
end
# Dummy step to log a failure.
defmodule MyApp.LogFailureStep do
use Reactor.Step
@impl true
def run(%{error: error}, _context, _opts) do
IO.puts("Logging failure: #{error}")
{:ok, %{logged: true, error: error}}
end
end
# Dummy step to track conversion.
defmodule MyApp.TrackConversionStep do
use Reactor.Step
@impl true
def run(%{email: email, plan_name: plan_name, notification_status: notification_status}, _context, _opts) do
IO.puts("Tracking conversion for #{email}, plan: #{plan_name}, notification: #{inspect(notification_status)}")
{:ok, %{tracked: true}}
end
end
# Dummy analytics hooks.
defmodule MyApp.PreAnalytics do
def before(args, _context, _step, _opts) do
IO.puts("Before analytics: #{inspect(args)}")
args
end
end
defmodule MyApp.PostAnalytics do
# Rename function to after_hook instead of after.
def after_hook(result) do
IO.puts("After analytics: #{inspect(result)}")
result
end
end
# The main Reactor module built with the full DSL.
defmodule MyApp.RegisterUserReactor do
use Reactor
input :email
input :password
input :plan_name
middlewares do
middleware MyApp.LoggingMiddleware
end
step :register_user, MyApp.RegisterUserStep do
argument :email, input(:email)
argument :password, input(:password)
end
step :create_stripe_customer, MyApp.CreateStripeCustomerStep do
argument :email, input(:email)
end
step :find_stripe_plan, MyApp.FindStripePlanStep do
argument :plan_name, input(:plan_name)
end
compose :create_subscription, MyApp.CreateStripeSubscriptionReactor do
argument :customer_id, result(:create_stripe_customer), transform: &(&1.id)
argument :plan_id, result(:find_stripe_plan), transform: &(&1.id)
end
switch :notification_switch, on: result(:create_subscription) do
matches? fn subscription ->
subscription != nil and Map.has_key?(subscription, :id)
end do
step :send_welcome_email, MyApp.SendWelcomeEmailStep do
argument :email, input(:email)
argument :subscription_id, result(:create_subscription), transform: &(&1.id)
end
end
default do
step :log_failure, MyApp.LogFailureStep do
argument :error, value("Subscription creation failed")
end
end
end
group :track_conversion,
before_all: &MyApp.PreAnalytics.before/4,
after_all: &MyApp.PostAnalytics.after_hook/1 do
step :track_conversion, MyApp.TrackConversionStep do
argument :email, input(:email)
argument :plan_name, input(:plan_name)
argument :notification_status, result(:send_welcome_email, [:status])
end
end
template :welcome_message, template: "Welcome <%= @email %> to our platform!" do
argument :email, input(:email)
end
return :welcome_message
end
# Run the reactor with sample input.
result =
Reactor.run!(MyApp.RegisterUserReactor, %{
email: "test@example.com",
password: "secret",
plan_name: "basic"
})
IO.inspect(result, label: "Final result")
defmodule InspectMiddleware do
use Reactor.Middleware
@impl true
def init(context) do
IO.puts("InspectMiddleware init: Reactor run is starting.")
{:ok, context}
end
@impl true
def complete(result, context) do
IO.puts("InspectMiddleware complete: Reactor finished with result: #{inspect(result)} #{inspect(context)}")
{:ok, result}
end
@impl true
def error(error, context) do
IO.puts("InspectMiddleware error: Reactor encountered error: #{inspect(error)} #{inspect(context)}")
:ok
end
@impl true
def halt(context) do
IO.puts("InspectMiddleware halt: Reactor run was halted. #{inspect(context)}")
{:ok, context}
end
end
defmodule SuccessfulStepReactor do
@moduledoc false
use Reactor
middlewares do
middleware InspectMiddleware
end
step :noop do
argument :marty, value(:mcfly)
run fn _, _ ->
# Wait for 500 milliseconds within the step
Process.sleep(500)
{:ok, :noop}
end
end
return :noop
end
start_time = System.monotonic_time()
{:ok, :noop} = Reactor.run(SuccessfulStepReactor)
duration_ms =
System.convert_time_unit(System.monotonic_time() - start_time, :native, :millisecond)
IO.puts("Duration: #{duration_ms} ms")
defmodule MyApp.DoSomethingStep do
use Reactor.Step
@impl true
def run(arguments, _context, _options) do
{:ok, %{something: arguments.action}}
end
end
defmodule MyApp.BuildPromptStep do
use Reactor.Step
@impl true
def run(arguments, _context, _options) do
prompt = "User wants to do #{arguments.result_of_something.something}."
{:ok, prompt}
end
end
defmodule MyApp.BuildPromptReactor do
use Reactor
input :action
step :do_something, MyApp.DoSomethingStep do
argument :action, input(:action)
end
step :build_prompt, MyApp.BuildPromptStep do
argument :result_of_something, result(:do_something)
end
return :build_prompt
end
# Run the Reactor and print the generated prompt
{:ok, prompt} = Reactor.run(MyApp.BuildPromptReactor, %{action: "generate a summary"})
IO.puts(prompt)
defmodule AshSwarm.Steps.GenerateAnswerStep do
use Reactor.Step
@impl true
def run(arguments, _context, _options) do
sys_msg = "You are a helpful assistant."
user_msg = arguments.question
case AshSwarm.InstructorHelper.gen(%{answer: :string}, sys_msg, user_msg) do
{:ok, %{answer: answer}} -> {:ok, answer}
{:error, error} -> {:error, error}
end
end
end
defmodule AshSwarm.AskQuestionReactor do
use Reactor
input :question
step :generate_answer, AshSwarm.Steps.GenerateAnswerStep do
argument :question, input(:question)
end
return :generate_answer
end
# Running the reactor
{:ok, answer} = Reactor.run(AshSwarm.AskQuestionReactor, %{question: "What is Elixir?"})
IO.puts("Answer: #{answer}")
defmodule ExampleReactor do
use Reactor
input :numbers
map :double_numbers do
source input(:numbers)
step :double do
argument :number, element(:double_numbers)
run fn args, _ ->
{:ok, args.number * 2}
end
end
return :double
end
end
result =
Reactor.run(ExampleReactor, %{numbers: [40, 50, 60]})
|> then(fn {:ok, numbers} -> {:ok, Enum.map(numbers, &(&1 + 1))} end)
defmodule InputReactor do
use Reactor
input :name
step :greet do
argument :name, input(:name)
run fn
%{name: nil}, _ -> {:ok, "Hello, World!"}
%{name: name}, _ -> {:ok, "Hello, #{name}"}
end
end
end
Reactor.run(InputReactor, %{name: "Sean"})
defmodule ValueReactor do
use Reactor
input :number
step :times_three do
argument :lhs, input(:number)
argument :rhs, value(3)
run fn args, _ ->
{:ok, args.lhs * args.rhs}
end
end
step :testing, Reactor.Step.Debug
end
Reactor.run(ValueReactor, %{number: 2})
defmodule Reactor.Middleware.Debug do
@moduledoc """
A Reactor middleware that logs debug information.
This middleware logs the start and stop of the Reactor execution, as well as the
execution of individual steps, including their inputs, results, errors, and retries.
Example log messages:
- Reactor started
- Step `:my_step` started with arguments: %{arg1: "value"}
- Step `:my_step` completed successfully with result: %{output: "value"}
- Step `:my_step` encountered an error: {:error, "Something went wrong"}
"""
use Reactor.Middleware
require Logger
@doc false
@impl true
def init(context) do
Logger.debug("๐ Reactor started execution.")
{:ok, context}
end
@doc false
@impl true
def complete(result, _context) do
Logger.debug("โ
Reactor execution completed successfully.")
{:ok, result}
end
@doc false
@impl true
def error(error, _context) do
Logger.error("โ Reactor execution encountered an error: #{inspect(error)}")
:ok
end
@doc false
@impl true
def halt(_context) do
Logger.warning("โ ๏ธ Reactor execution was halted.")
:ok
end
@doc false
@impl true
def event({:run_start, arguments}, step, _context) do
Logger.info("โถ๏ธ Step `#{step.name}` started with arguments: #{inspect(arguments)}")
end
def event({:run_complete, result}, step, _context) do
Logger.info("โ
Step `#{step.name}` completed successfully with result: #{inspect(result)}")
end
def event({:run_error, errors}, step, _context) do
Logger.error("โ Step `#{step.name}` encountered an error: #{inspect(errors)}")
end
def event({:run_retry, value}, step, _context) do
Logger.warning("๐ Step `#{step.name}` is retrying with value: #{inspect(value)}")
end
def event({:compensate_start, reason}, step, _context) do
Logger.warning("โป๏ธ Step `#{step.name}` is compensating due to: #{inspect(reason)}")
end
def event({:compensate_complete, _result}, step, _context) do
Logger.info("๐ Step `#{step.name}` compensation completed.")
end
def event({:undo_start, _}, step, _context) do
Logger.warning("โช Step `#{step.name}` undo process started.")
end
def event({:undo_complete, _}, step, _context) do
Logger.info("โฉ Step `#{step.name}` undo process completed.")
end
end
defmodule GreetReactor do
use Reactor
input :name
middlewares do
middleware Reactor.Middleware.Debug
end
step :greet do
argument :name, input(:name)
run fn
%{name: nil}, _ -> {:ok, "Hello, World!"}
%{name: name}, _ -> {:ok, "Hello, #{name}"}
end
end
end
Reactor.run(InputReactor, %{name: "Sean"})