Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Reactor Practice

live_books/reactor_practice.livemd

Reactor Practice

Mix.install(
  [
  {:instructor, "~> 0.1"},
  {:ash_swarm, github: "seanchatmangpt/ash_swarm", branch: "main"},
    {:ash, "~> 3.0"},
  {:reactor, "~> 0.13.3"},
{:igniter, "~> 0.5"}
  ],
  config: [
    instructor: [
      api_url: "https://api.groq.com/openai",
      api_key: System.get_env("LB_GROQ_API_KEY"), # Replace with your actual API key
    ]
  ], 
  consolidate_protocols: false
)

Application.put_env(:ash, :validate_domain_resource_inclusion?, false)
Application.put_env(:ash, :validate_domain_config_inclusion?, false)

# Configure the Groq adapter explicitly as the default adapter
Application.put_env(:instructor, :adapter, Instructor.Adapters.Groq)

# Add the Groq adapter-specific configuration
Application.put_env(:instructor, :groq, [
  api_url: "https://api.groq.com/openai",
  api_key: System.get_env("LB_GROQ_API_KEY"), # Replace with your actual API key
  http_options: [receive_timeout: 60_000]
])

Logger.configure_backend(:console, format: "[$level] $message\n", level: :debug)

Section

defmodule Greeter do
  @moduledoc false
  use Reactor.Step

  def run(%{whom: nil}, _, _), do: {:ok, "Hello, World!"}
  def run(%{whom: whom}, _, _), do: {:ok, "Hello, #{whom}!"}
end

reactor = Reactor.Builder.new()
|> Reactor.Builder.add_input!(:whom)
|> Reactor.Builder.add_step!(:greeter, Greeter, whom: {:input, :whom})
|> Reactor.Builder.return!(:greeter)

Reactor.run!(reactor, %{whom: "Sean"})
defmodule GreeterReactor do
  use Reactor

  input :whom

  step :greeter, Greeter do
    argument :whom, input(:whom)
  end

  return :greeter
end

Reactor.run!(GreeterReactor, %{whom: "Sean"})

# Dummy Middleware โ€“ simply passes the result along.
defmodule MyApp.LoggingMiddleware do
  @behaviour Reactor.Middleware
  @impl true
  def call(result, _step, _context), do: result
end

# Dummy step to register a user.
defmodule MyApp.RegisterUserStep do
  use Reactor.Step
  @impl true
  def run(%{email: email, password: _password} = args, _context, _opts) do
    IO.puts("Registering user: #{email}")
    {:ok, Map.put(args, :id, 1)}
  end
end

# Dummy step to create a Stripe customer.
defmodule MyApp.CreateStripeCustomerStep do
  use Reactor.Step
  @impl true
  def run(%{email: email}, _context, _opts) do
    IO.puts("Creating Stripe customer for: #{email}")
    {:ok, %{id: 101, email: email}}
  end
end

# Dummy step to find a Stripe plan.
defmodule MyApp.FindStripePlanStep do
  use Reactor.Step
  @impl true
  def run(%{plan_name: plan_name}, _context, _opts) do
    IO.puts("Finding Stripe plan: #{plan_name}")
    {:ok, %{id: 201, name: plan_name}}
  end
end

# Dummy step to create a Stripe subscription.
defmodule MyApp.CreateStripeSubscriptionStep do
  use Reactor.Step
  @impl true
  def run(%{customer_id: customer_id, plan_id: plan_id}, _context, _opts) do
    IO.puts("Creating Stripe subscription for customer #{customer_id} on plan #{plan_id}")
    {:ok, %{id: 301, customer_id: customer_id, plan_id: plan_id}}
  end
end

# A Reactor to create a subscription (composed into our main reactor).
defmodule MyApp.CreateStripeSubscriptionReactor do
  use Reactor

  input :customer_id
  input :plan_id

  step :create_subscription, MyApp.CreateStripeSubscriptionStep do
    argument :customer_id, input(:customer_id)
    argument :plan_id, input(:plan_id)
  end

  return :create_subscription
end

# Dummy step to send a welcome email.
defmodule MyApp.SendWelcomeEmailStep do
  use Reactor.Step
  @impl true
  def run(%{email: email, subscription_id: subscription_id}, _context, _opts) do
    IO.puts("Sending welcome email to #{email}")
    {:ok, %{status: "email sent", subscription_id: subscription_id}}
  end
end

# Dummy step to log a failure.
defmodule MyApp.LogFailureStep do
  use Reactor.Step
  @impl true
  def run(%{error: error}, _context, _opts) do
    IO.puts("Logging failure: #{error}")
    {:ok, %{logged: true, error: error}}
  end
end

# Dummy step to track conversion.
defmodule MyApp.TrackConversionStep do
  use Reactor.Step
  @impl true
  def run(%{email: email, plan_name: plan_name, notification_status: notification_status}, _context, _opts) do
    IO.puts("Tracking conversion for #{email}, plan: #{plan_name}, notification: #{inspect(notification_status)}")
    {:ok, %{tracked: true}}
  end
end

# Dummy analytics hooks.
defmodule MyApp.PreAnalytics do
  def before(args, _context, _step, _opts) do
    IO.puts("Before analytics: #{inspect(args)}")
    args
  end
end

defmodule MyApp.PostAnalytics do
  # Rename function to after_hook instead of after.
  def after_hook(result) do
    IO.puts("After analytics: #{inspect(result)}")
    result
  end
end

# The main Reactor module built with the full DSL.
defmodule MyApp.RegisterUserReactor do
  use Reactor

  input :email
  input :password
  input :plan_name

  middlewares do
    middleware MyApp.LoggingMiddleware
  end
  
  step :register_user, MyApp.RegisterUserStep do
    argument :email, input(:email)
    argument :password, input(:password)
  end

  step :create_stripe_customer, MyApp.CreateStripeCustomerStep do
    argument :email, input(:email)
  end

  step :find_stripe_plan, MyApp.FindStripePlanStep do
    argument :plan_name, input(:plan_name)
  end

  compose :create_subscription, MyApp.CreateStripeSubscriptionReactor do
    argument :customer_id, result(:create_stripe_customer), transform: &(&1.id)
    argument :plan_id, result(:find_stripe_plan), transform: &(&1.id)
  end

  switch :notification_switch, on: result(:create_subscription) do
    matches? fn subscription ->
      subscription != nil and Map.has_key?(subscription, :id)
    end do
      step :send_welcome_email, MyApp.SendWelcomeEmailStep do
        argument :email, input(:email)
        argument :subscription_id, result(:create_subscription), transform: &(&1.id)
      end
    end

    default do
      step :log_failure, MyApp.LogFailureStep do
        argument :error, value("Subscription creation failed")
      end
    end
  end

  group :track_conversion,
    before_all: &MyApp.PreAnalytics.before/4,
    after_all: &MyApp.PostAnalytics.after_hook/1 do
    step :track_conversion, MyApp.TrackConversionStep do
      argument :email, input(:email)
      argument :plan_name, input(:plan_name)
      argument :notification_status, result(:send_welcome_email, [:status])
    end
  end

  template :welcome_message, template: "Welcome <%= @email %> to our platform!" do
    argument :email, input(:email)
  end

  return :welcome_message
end

# Run the reactor with sample input.
result =
  Reactor.run!(MyApp.RegisterUserReactor, %{
    email: "test@example.com",
    password: "secret",
    plan_name: "basic"
  })

IO.inspect(result, label: "Final result")
defmodule InspectMiddleware do
  use Reactor.Middleware

  @impl true
  def init(context) do
    IO.puts("InspectMiddleware init: Reactor run is starting.")
    {:ok, context}
  end

  @impl true
  def complete(result, context) do
    IO.puts("InspectMiddleware complete: Reactor finished with result: #{inspect(result)} #{inspect(context)}")
    {:ok, result}
  end

  @impl true
  def error(error, context) do
    IO.puts("InspectMiddleware error: Reactor encountered error: #{inspect(error)} #{inspect(context)}")
    :ok
  end

  @impl true
  def halt(context) do
    IO.puts("InspectMiddleware halt: Reactor run was halted. #{inspect(context)}")
    {:ok, context}
  end
end
defmodule SuccessfulStepReactor do
  @moduledoc false
  use Reactor

  middlewares do
    middleware InspectMiddleware
  end

  step :noop do
    argument :marty, value(:mcfly)
    run fn _, _ ->
      # Wait for 500 milliseconds within the step
      Process.sleep(500)
      {:ok, :noop}
    end
  end

  return :noop
end

start_time = System.monotonic_time()
{:ok, :noop} = Reactor.run(SuccessfulStepReactor)
duration_ms =
  System.convert_time_unit(System.monotonic_time() - start_time, :native, :millisecond)

IO.puts("Duration: #{duration_ms} ms")


defmodule MyApp.DoSomethingStep do
  use Reactor.Step

  @impl true
  def run(arguments, _context, _options) do
    {:ok, %{something: arguments.action}}
  end
end

defmodule MyApp.BuildPromptStep do
  use Reactor.Step

  @impl true
  def run(arguments, _context, _options) do
    prompt = "User wants to do #{arguments.result_of_something.something}."
    {:ok, prompt}
  end
end

defmodule MyApp.BuildPromptReactor do
  use Reactor

  input :action

  step :do_something, MyApp.DoSomethingStep do
    argument :action, input(:action)
  end

  step :build_prompt, MyApp.BuildPromptStep do
    argument :result_of_something, result(:do_something)
  end

  return :build_prompt
end

# Run the Reactor and print the generated prompt
{:ok, prompt} = Reactor.run(MyApp.BuildPromptReactor, %{action: "generate a summary"})
IO.puts(prompt)
defmodule AshSwarm.Steps.GenerateAnswerStep do
  use Reactor.Step

  @impl true
  def run(arguments, _context, _options) do
    sys_msg = "You are a helpful assistant."
    user_msg = arguments.question

    case AshSwarm.InstructorHelper.gen(%{answer: :string}, sys_msg, user_msg) do
      {:ok, %{answer: answer}} -> {:ok, answer}
      {:error, error} -> {:error, error}
    end
  end
end

defmodule AshSwarm.AskQuestionReactor do
  use Reactor

  input :question

  step :generate_answer, AshSwarm.Steps.GenerateAnswerStep do
    argument :question, input(:question)
  end

  return :generate_answer
end

# Running the reactor
{:ok, answer} = Reactor.run(AshSwarm.AskQuestionReactor, %{question: "What is Elixir?"})
IO.puts("Answer: #{answer}")
defmodule ExampleReactor do
  use Reactor

  input :numbers

  map :double_numbers do
    source input(:numbers)

    step :double do
      argument :number, element(:double_numbers)

      run fn args, _ ->
        {:ok, args.number * 2}
      end
    end

    return :double
  end
end

result = 
  Reactor.run(ExampleReactor, %{numbers: [40, 50, 60]})
  |> then(fn {:ok, numbers} -> {:ok, Enum.map(numbers, &amp;(&amp;1 + 1))} end)
  defmodule InputReactor do
    use Reactor
  
    input :name
  
    step :greet do
      argument :name, input(:name)
      run fn
        %{name: nil}, _ -> {:ok, "Hello, World!"}
        %{name: name}, _ -> {:ok, "Hello, #{name}"}
      end
    end
  end
  
  Reactor.run(InputReactor, %{name: "Sean"})
defmodule ValueReactor do
  use Reactor

  input :number

  step :times_three do
    argument :lhs, input(:number)
    argument :rhs, value(3)

    run fn args, _ ->
      {:ok, args.lhs * args.rhs}
    end
  end

  step :testing, Reactor.Step.Debug
end

Reactor.run(ValueReactor, %{number: 2})
defmodule Reactor.Middleware.Debug do
  @moduledoc """
  A Reactor middleware that logs debug information.

  This middleware logs the start and stop of the Reactor execution, as well as the
  execution of individual steps, including their inputs, results, errors, and retries.

  Example log messages:
    - Reactor started
    - Step `:my_step` started with arguments: %{arg1: "value"}
    - Step `:my_step` completed successfully with result: %{output: "value"}
    - Step `:my_step` encountered an error: {:error, "Something went wrong"}
  """

  use Reactor.Middleware
  require Logger

  @doc false
  @impl true
  def init(context) do
    Logger.debug("๐Ÿš€ Reactor started execution.")
    {:ok, context}
  end

  @doc false
  @impl true
  def complete(result, _context) do
    Logger.debug("โœ… Reactor execution completed successfully.")
    {:ok, result}
  end

  @doc false
  @impl true
  def error(error, _context) do
    Logger.error("โŒ Reactor execution encountered an error: #{inspect(error)}")
    :ok
  end

  @doc false
  @impl true
  def halt(_context) do
    Logger.warning("โš ๏ธ Reactor execution was halted.")
    :ok
  end

  @doc false
  @impl true
  def event({:run_start, arguments}, step, _context) do
    Logger.info("โ–ถ๏ธ Step `#{step.name}` started with arguments: #{inspect(arguments)}")
  end

  def event({:run_complete, result}, step, _context) do
    Logger.info("โœ… Step `#{step.name}` completed successfully with result: #{inspect(result)}")
  end

  def event({:run_error, errors}, step, _context) do
    Logger.error("โŒ Step `#{step.name}` encountered an error: #{inspect(errors)}")
  end

  def event({:run_retry, value}, step, _context) do
    Logger.warning("๐Ÿ”„ Step `#{step.name}` is retrying with value: #{inspect(value)}")
  end

  def event({:compensate_start, reason}, step, _context) do
    Logger.warning("โ™ป๏ธ Step `#{step.name}` is compensating due to: #{inspect(reason)}")
  end

  def event({:compensate_complete, _result}, step, _context) do
    Logger.info("๐Ÿ”„ Step `#{step.name}` compensation completed.")
  end

  def event({:undo_start, _}, step, _context) do
    Logger.warning("โช Step `#{step.name}` undo process started.")
  end

  def event({:undo_complete, _}, step, _context) do
    Logger.info("โฉ Step `#{step.name}` undo process completed.")
  end
end
  defmodule GreetReactor do
    use Reactor
  
    input :name
  
    middlewares do
      middleware Reactor.Middleware.Debug
    end
  
    step :greet do
      argument :name, input(:name)
      run fn
        %{name: nil}, _ -> {:ok, "Hello, World!"}
        %{name: name}, _ -> {:ok, "Hello, #{name}"}
      end
    end
  end

Reactor.run(InputReactor, %{name: "Sean"})