Powered by AppSignal & Oban Pro

Req LLM with ReqCassette

livebooks/req_llm.livemd

Req LLM with ReqCassette

Mix.install([
  {:req_llm, "~> 1.0.0-rc.7"},
  {:req_cassette, path: Path.join(__DIR__, "..")}
])

Setup

# Set your Anthropic API key
# System.put_env("ANTHROPIC_API_KEY", "sk-...")
#System.put_env("ANTHROPIC_API_KEY", System.get_env("LB_ANTHROPIC_API_KEY"))

ReqLLM Basic Usage (without cassettes)

# Keys are picked up from .env files or environment variables - see `ReqLLM.Keys`
model = "anthropic:claude-sonnet-4-20250514"

ReqLLM.generate_text!(model, "Hello world")
#=> "Hello! How can I assist you today?"
# Streaming text generation
{:ok, response} = ReqLLM.stream_text(model, "Write a short story")
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()

# Access usage metadata after streaming
usage = ReqLLM.StreamResponse.usage(response)

ReqCassette: Record and Replay LLM Calls

The following sections demonstrate how to use ReqCassette to record LLM API calls and replay them without hitting the API (saving cost and time).

Setup Cassette Directory

cassette_dir = Path.join(__DIR__, "cassettes")  |> dbg
File.rm_rf!(cassette_dir)
File.mkdir_p!(cassette_dir)
File.ls!(cassette_dir)

Using ReqCassette with ReqLLM

model = "anthropic:claude-sonnet-4-20250514"
prompt = "Write a short poem about Elixir programming language"

# First call - will record to cassette (costs money)
{:ok, response1} =
  ReqLLM.generate_text(
    model,
    prompt,
    max_tokens: 100,
    req_http_options: [
      plug:
        {ReqCassette.Plug, %{
          cassette_dir: cassette_dir,
          cassette_name: "test1",
          mode: :record,
          filter_request_headers: ["authorization", "x-api-key", "cookie"]
        }}
    ]
  )

text1 = ReqLLM.Response.text(response1)
IO.puts("First call:\n#{text1}\n")
# Second call - will replay from cassette (FREE! No API call!)
{:ok, response2} =
  ReqLLM.generate_text(
    model,
    prompt,
    max_tokens: 100,
    req_http_options: [
      plug:
        {ReqCassette.Plug, %{
          cassette_dir: cassette_dir,
          cassette_name: "test1",
          mode: :replay,
          filter_request_headers: ["authorization", "x-api-key", "cookie"]
        }}
    ]
  )

text2 = ReqLLM.Response.text(response2)
IO.puts("Second call (from cassette):\n#{text2}\n")

# Verify they're identical
if text1 == text2 do
  IO.puts("โœ… Both responses are identical - cassette replay worked!")
else
  IO.puts("โŒ Responses differ - something went wrong")
end

View Cassette Files

# List all cassette files
cassettes = File.ls!(cassette_dir)
IO.puts("Cassettes created: #{length(cassettes)}")

AI Agent with Tool Calling

MyAgentWithCassettes - Cassette-Compatible Agent

This agent uses non-streaming responses so it can work with ReqCassette for record/replay.

defmodule MyAgentWithCassettes do
  @moduledoc """
  A GenServer-based AI agent that supports ReqCassette for recording and replaying LLM calls.

  Uses non-streaming responses to enable cassette support. All LLM calls are recorded
  to cassettes and can be replayed without making actual API calls.

  ## Usage

      # Start the agent with cassette support
      {:ok, agent} = MyAgentWithCassettes.start_link(
        cassette_opts: [
          cassette_name: "my_agent",
          cassette_dir: "cassettes",
          mode: :record
        ]
      )

      # Send a prompt (first call records, subsequent calls replay)
      MyAgentWithCassettes.prompt(agent, "What's 15 * 7?")

  ## Features

  - Non-streaming text generation (enables cassette support)
  - Tool calling with proper argument parsing
  - Conversation history maintenance
  - Automatic cassette recording and replay
  - Calculator and web search tools included
  """
  use GenServer

  alias ReqLLM.{Context, Tool, Response, ToolCall}

  defstruct [:history, :tools, :model, :req_http_options]

  @default_model "anthropic:claude-sonnet-4-20250514"

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts)
  end

  def prompt(pid, message) when is_binary(message) do
    GenServer.call(pid, {:prompt, message}, 30_000)
  end

  @impl true
  def init(opts) do
    system_prompt =
      Keyword.get(opts, :system_prompt, """
      You are a helpful AI assistant with access to tools.

      When you need to compute math, use the calculator tool with the expression parameter.

      Do not wrap arguments in code fences. Do not include extra text in arguments.

      When you need to search for information, use the web_search tool with a relevant query.

      Always use tools when appropriate and provide clear, helpful responses.
      """)

    model = Keyword.get(opts, :model, @default_model)
    tools = setup_tools()

    # Setup cassette configuration
    req_http_options =
      case Keyword.get(opts, :cassette_opts) do
        nil ->
          []

        cassette_opts ->
          [plug: {ReqCassette.Plug, Map.new(cassette_opts)}]
      end

    history = Context.new([Context.system(system_prompt)])

    {:ok,
     %__MODULE__{
       history: history,
       tools: tools,
       model: model,
       req_http_options: req_http_options
     }}
  end

  @impl true
  def handle_call({:prompt, message}, _from, state) do
    new_history = Context.append(state.history, Context.user(message))

    case generate_with_tools(state.model, new_history, state.tools, state.req_http_options) do
      {:ok, final_history, final_response} ->
        {:reply, {:ok, final_response}, %{state | history: final_history}}

      {:error, error} ->
        IO.puts("Error: #{inspect(error)}")
        {:reply, {:error, error}, state}
    end
  end

  defp generate_with_tools(model, history, tools, req_http_options) do
    # Make initial request with tools
    case ReqLLM.generate_text(
           model,
           history.messages,
           tools: tools,
           max_tokens: 1024,
           req_http_options: req_http_options
         ) do
      {:ok, response} ->
        text = Response.text(response)
        IO.puts("Assistant: #{text}")

        # Check if any tools were called
        tool_calls = extract_tool_calls(response)

        if tool_calls == [] do
          # No tools called, we're done
          final_history = Context.append(history, Context.assistant(text))
          {:ok, final_history, text}
        else
          # Tools were called, execute them and make follow-up request
          IO.puts("\n๐Ÿ”ง Executing tools...")

          assistant_message = Context.assistant(text, tool_calls: tool_calls)
          history_with_tool_call = Context.append(history, assistant_message)

          # Execute tools and collect results as tool result messages
          tool_result_messages =
            Enum.map(tool_calls, fn tool_call ->
              tool = Enum.find(tools, fn t -> t.name == tool_call.name end)

              if tool do
                case ReqLLM.Tool.execute(tool, tool_call.arguments) do
                  {:ok, result} ->
                    IO.puts(
                      "  #{tool_call.name}(#{inspect(tool_call.arguments)}) โ†’ #{inspect(result)}"
                    )

                    result_str = if is_binary(result), do: result, else: Jason.encode!(result)
                    Context.tool_result(tool_call.id, tool_call.name, result_str)

                  {:error, error} ->
                    IO.puts("  โŒ #{tool_call.name}: #{inspect(error)}")
                    error_result = %{error: "Tool execution failed: #{inspect(error)}"}
                    Context.tool_result(tool_call.id, tool_call.name, Jason.encode!(error_result))
                end
              else
                IO.puts("  โŒ Tool #{tool_call.name} not found")
                error_result = %{error: "Tool not found"}
                Context.tool_result(tool_call.id, tool_call.name, Jason.encode!(error_result))
              end
            end)

          # Append tool result messages
          history_with_results = Context.append(history_with_tool_call, tool_result_messages)

          # Make follow-up request with tool results
          IO.puts("\n")

          case ReqLLM.generate_text(
                 model,
                 history_with_results.messages,
                 max_tokens: 1024,
                 req_http_options: req_http_options
               ) do
            {:ok, final_response} ->
              final_text = Response.text(final_response)
              IO.puts("Assistant: #{final_text}")

              final_history = Context.append(history_with_results, Context.assistant(final_text))
              {:ok, final_history, final_text}

            {:error, error} ->
              {:error, error}
          end
        end

      {:error, error} ->
        {:error, error}
    end
  end

  defp extract_tool_calls(%Response{} = response) do
    # Extract tool calls from the response message
    case response.message do
      %{tool_calls: tool_calls} when is_list(tool_calls) and length(tool_calls) > 0 ->
        Enum.map(tool_calls, fn tool_call ->
          %{
            id: tool_call.id,
            name: ToolCall.name(tool_call),
            arguments: ToolCall.args_map(tool_call) || %{}
          }
        end)

      _ ->
        []
    end
  end

  defp setup_tools do
    [
      Tool.new!(
        name: "calculator",
        description: "Perform mathematical calculations. Pass an expression string.",
        parameter_schema: [
          expression: [
            type: :string,
            required: true,
            doc: "Mathematical expression to evaluate. Examples: '15 * 7', '10 + 5', 'sqrt(16)'"
          ]
        ],
        callback: &calculator_callback/1
      ),
      Tool.new!(
        name: "web_search",
        description: "Search the web for information",
        parameter_schema: [
          query: [type: :string, required: true, doc: "Search query"]
        ],
        callback: fn %{"query" => query} ->
          {:ok, "Mock search results for: #{query}"}
        end
      )
    ]
  end

  defp calculator_callback(%{"expression" => expr}) when is_binary(expr) do
    {result, _} = Code.eval_string(expr)
    {:ok, result}
  rescue
    e -> {:error, "Invalid expression: #{Exception.message(e)}"}
  end

  defp calculator_callback(%{expression: expr}) when is_binary(expr) do
    {result, _} = Code.eval_string(expr)
    {:ok, result}
  rescue
    e -> {:error, "Invalid expression: #{Exception.message(e)}"}
  end

  defp calculator_callback(args) do
    {:error, "Provide an expression string. Example: {\"expression\":\"15 * 7\"}. Got: #{inspect(args)}"}
  end
end

Using MyAgentWithCassettes

# Create cassette directory for agent
agent_cassette_dir = Path.join(__DIR__, "agent_cassettes")

# Clean up old cassettes for fresh testing
File.rm_rf!(agent_cassette_dir)
File.mkdir_p!(agent_cassette_dir)

agent_cassettes = File.ls!(agent_cassette_dir)
IO.puts("Agent cassettes created: #{length(agent_cassettes)}")

# Start agent with cassette support
{:ok, cassette_agent_1} =
  MyAgentWithCassettes.start_link(
    cassette_opts: [
      cassette_dir: agent_cassette_dir,
      cassette_name: "agent",
      mode: :record,
      filter_request_headers: ["authorization", "x-api-key", "cookie"]
    ]
  )

# Start agent with cassette support
{:ok, cassette_agent_2} =
  MyAgentWithCassettes.start_link(
    cassette_opts: [
      cassette_dir: agent_cassette_dir,
      cassette_name: "agent",
      mode: :record,
      filter_request_headers: ["authorization", "x-api-key", "cookie"]
    ]
  )

IO.puts("โœ… Agent started with cassette support (cassettes cleaned)")
# First call - will record to cassette (costs money)
MyAgentWithCassettes.prompt(cassette_agent_1, "What is 15 * 7?")
# Second identical call - will replay from cassette (FREE!)
MyAgentWithCassettes.prompt(cassette_agent_2, "What is 15 * 7?")
# First call - will record to cassette (costs money)
MyAgentWithCassettes.prompt(cassette_agent_1, "Make a joke with the result of 35 - 11?")
# Second identical call - will replay from cassette (FREE!)
MyAgentWithCassettes.prompt(cassette_agent_2, "Make a joke with the result of 35 - 11?")
# Verify cassettes were created
agent_cassettes = File.ls!(agent_cassette_dir)
IO.puts("Agent cassettes created: #{length(agent_cassettes)}")

# Each tool execution and follow-up creates separate cassettes
Enum.each(agent_cassettes, fn cassette ->
  IO.puts("  - #{cassette}")
  cassette_data = File.read!(Path.join(agent_cassette_dir, cassette)) |> Jason.decode!()
  IO.puts("Number of records is #{length(cassette_data["interactions"])}")
end)

MyAgent - Streaming Version (No Cassette Support)

This is the original streaming agent. It provides real-time output but cannot use cassettes.

defmodule MyAgent do
  @moduledoc """
  A GenServer-based AI agent that uses ReqLLM for streaming text generation with tool calling.

  This agent provides a conversation interface with maintained history and supports
  function calling capabilities with Claude 3.5's streaming format.

  ## Usage

      # Start the agent
      {:ok, agent} = MyAgent.start_link()

      # Send a prompt
      MyAgent.prompt(agent, "What's 15 * 7?")

      # Agent streams response to stdout and returns final text
      #=> {:ok, "15 * 7 = 105"}

  ## Features

  - Streaming text generation with real-time output
  - Tool calling with proper argument parsing from Claude 3.5
  - Conversation history maintenance
  - Two-step completion for tool usage scenarios
  - Calculator and web search tools included

  """
  use GenServer

  alias ReqLLM.{Context, Tool}

  defstruct [:history, :tools, :model]

  @default_model "anthropic:claude-sonnet-4-20250514"

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts)
  end

  def prompt(pid, message) when is_binary(message) do
    GenServer.call(pid, {:prompt, message}, 30_000)
  end

  def prompt(pid, model, message) when is_binary(model) and is_binary(message) do
    GenServer.call(pid, {:prompt, model, message}, 30_000)
  end

  @impl true
  def init(opts) do
    system_prompt =
      Keyword.get(opts, :system_prompt, """
      You are a helpful AI assistant with access to tools.

      When you need to compute math, use the calculator tool with the expression parameter.

      Do not wrap arguments in code fences. Do not include extra text in arguments.

      When you need to search for information, use the web_search tool with a relevant query.

      Always use tools when appropriate and provide clear, helpful responses.
      """)

    model = Keyword.get(opts, :model, @default_model)
    tools = setup_tools()

    history = Context.new([Context.system(system_prompt)])

    {:ok, %__MODULE__{history: history, tools: tools, model: model}}
  end

  @impl true
  def handle_call({:prompt, message}, from, %{model: model} = state) do
    handle_call({:prompt, model, message}, from, state)
  end

  @impl true
  def handle_call({:prompt, model, message}, _from, state) do
    new_history = Context.append(state.history, Context.user(message))

    case stream_and_handle_tools(model, new_history, state.tools) do
      {:ok, final_history, final_response} ->
        IO.write("\n")
        {:reply, {:ok, final_response}, %{state | history: final_history}}

      {:error, error} ->
        IO.write("Error: #{inspect(error)}\n")
        {:reply, {:error, error}, state}
    end
  end

  defp stream_and_handle_tools(model, history, tools) do
    case ReqLLM.stream_text(model, history.messages, tools: tools) do
      {:ok, stream_response} ->
        # Stream chunks to console in real-time and collect for processing
        chunks =
          stream_response.stream
          |> Enum.map(fn chunk ->
            # Stream to console immediately
            IO.write(chunk.text)

            chunk
          end)

        case extract_tool_calls_from_chunks(chunks) do
          [] ->
            text = chunks |> Enum.map_join("", & &1.text)
            final_history = Context.append(history, Context.assistant(text))
            {:ok, final_history, text}

          tool_calls ->
            initial_text = chunks |> Enum.map_join("", & &1.text)

            assistant_message = Context.assistant(initial_text, tool_calls: tool_calls)
            history_with_tool_call = Context.append(history, assistant_message)

            # Execute tools and show results
            IO.write("\n")

            history_with_results =
              Enum.reduce(tool_calls, history_with_tool_call, fn tool_call, ctx ->
                # Find the tool
                tool = Enum.find(tools, fn t -> t.name == tool_call.name end)

                if tool do
                  case ReqLLM.Tool.execute(tool, tool_call.arguments) do
                    {:ok, result} ->
                      IO.write(
                        "๐Ÿ”ง #{tool_call.name}(#{inspect(tool_call.arguments)}) โ†’ #{inspect(result)}\n"
                      )

                      tool_result_msg =
                        Context.tool_result_message(tool_call.name, tool_call.id, result)

                      Context.append(ctx, tool_result_msg)

                    {:error, error} ->
                      IO.write("โŒ #{tool_call.name}: #{inspect(error)}\n")
                      error_result = %{error: "Tool execution failed"}

                      tool_result_msg =
                        Context.tool_result_message(tool_call.name, tool_call.id, error_result)

                      Context.append(ctx, tool_result_msg)
                  end
                else
                  IO.write("โŒ Tool #{tool_call.name} not found\n")
                  ctx
                end
              end)

            case ReqLLM.stream_text(model, history_with_results.messages) do
              {:ok, final_stream_response} ->
                IO.write("\n")
                # Stream final response to console in real-time
                final_chunks =
                  final_stream_response.stream
                  |> Enum.map(fn chunk ->
                    # Stream to console immediately
                    IO.write(chunk.text)
                    chunk
                  end)

                final_text = final_chunks |> Enum.map_join("", & &1.text)

                final_history =
                  Context.append(history_with_results, Context.assistant(final_text))

                {:ok, final_history, final_text}

              {:error, error} ->
                {:error, error}
            end
        end

      {:error, error} ->
        {:error, error}
    end
  end

  defp extract_tool_calls_from_chunks(chunks) do
    # Base tool calls with index
    tool_calls =
      chunks
      |> Enum.filter(&(&1.type == :tool_call))
      |> Enum.map(fn chunk ->
        %{
          id: Map.get(chunk.metadata, :id) || "call_#{:erlang.unique_integer()}",
          name: chunk.name,
          arguments: chunk.arguments || %{},
          index: Map.get(chunk.metadata, :index, 0)
        }
      end)

    # Collect argument fragments from meta chunks
    arg_fragments =
      chunks
      |> Enum.filter(&(&1.type == :meta))
      |> Enum.filter(&Map.has_key?(&1.metadata, :tool_call_args))
      |> Enum.group_by(& &1.metadata.tool_call_args.index)
      |> Map.new(fn {index, fragments} ->
        json = fragments |> Enum.map_join("", & &1.metadata.tool_call_args.fragment)
        {index, json}
      end)

    # Merge accumulated JSON back into tool calls
    tool_calls
    |> Enum.map(fn call ->
      case Map.get(arg_fragments, call.index) do
        nil ->
          Map.delete(call, :index)

        json ->
          case Jason.decode(json) do
            {:ok, args} -> call |> Map.put(:arguments, args) |> Map.delete(:index)
            # keep empty args if invalid JSON
            {:error, _} -> Map.delete(call, :index)
          end
      end
    end)
  end

  defp setup_tools do
    [
      Tool.new!(
        name: "calculator",
        description:
          "Perform mathematical calculations. Prefer structured arguments: " <>
            ~s|{"operation":"multiply","operands":[15,7]}| <>
            ". As a fallback, you may pass an expression string: " <>
            ~s|{"expression":"15 * 7 + 23"}| <>
            ". Valid operations: add, subtract, multiply, divide, power, sqrt.",
        parameter_schema: [
          operation: [
            type: :string,
            required: false,
            doc: "One of: add, subtract, multiply, divide, power, sqrt"
          ],
          operands: [
            type: {:list, :any},
            required: false,
            doc: "Numbers to operate on. For sqrt, pass a single number; for others, pass 2+."
          ],
          expression: [
            type: :string,
            required: false,
            doc: "Optional fallback. Examples: '15 * 7 + 23', '10 * 5', 'sqrt(16)'."
          ]
        ],
        callback: &amp;calculator_callback/1
      ),
      Tool.new!(
        name: "web_search",
        description: "Search the web for information",
        parameter_schema: [
          query: [type: :string, required: true, doc: "Search query"]
        ],
        callback: fn %{"query" => query} ->
          {:ok, "Mock search results for: #{query}"}
        end
      )
    ]
  end

  defp calculator_callback(%{expression: expr}) when is_binary(expr) do
    {result, _} = Code.eval_string(expr)
    {:ok, result}
  rescue
    e -> {:error, "Invalid expression: #{Exception.message(e)}"}
  end

  defp calculator_callback(%{operation: op, operands: ops}) when is_list(ops) do
    with :ok <- validate_operation(op),
         {:ok, nums} <- cast_numbers(ops) do
      compute(op, nums)
    end
  end

  defp calculator_callback(%{"expression" => expr}) when is_binary(expr) do
    calculator_callback(%{expression: expr})
  end

  defp calculator_callback(%{"operation" => op, "operands" => ops}) when is_list(ops) do
    calculator_callback(%{operation: op, operands: ops})
  end

  defp calculator_callback(args) do
    {:error,
     "Provide either {operation, operands} or {expression}. Examples: " <>
       ~s|{"operation":"multiply","operands":[15,7]}| <>
       " or " <>
       ~s|{"expression":"15 * 7 + 23"}| <> ". Got: #{inspect(args)}"}
  end

  defp validate_operation(op)
       when op in ["add", "subtract", "multiply", "divide", "power", "sqrt"],
       do: :ok

  defp validate_operation(op),
    do: {:error, "Invalid operation: #{op}. Valid: add, subtract, multiply, divide, power, sqrt"}

  defp cast_numbers(ops) do
    nums =
      Enum.map(ops, fn
        n when is_integer(n) -> n * 1.0
        n when is_float(n) -> n
        s when is_binary(s) -> String.to_float(s)
      end)

    {:ok, nums}
  rescue
    _ -> {:error, "All operands must be numbers"}
  end

  defp compute("add", nums), do: {:ok, Enum.sum(nums)}
  defp compute("subtract", [a, b]), do: {:ok, a - b}
  defp compute("multiply", nums), do: {:ok, Enum.reduce(nums, 1, &amp;(&amp;1 * &amp;2))}
  defp compute("divide", [a, b]) when b != 0, do: {:ok, a / b}
  defp compute("divide", [_, 0]), do: {:error, "Division by zero"}
  defp compute("power", [a, b]), do: {:ok, :math.pow(a, b)}
  defp compute("sqrt", [a]) when a >= 0, do: {:ok, :math.sqrt(a)}
  defp compute("sqrt", [a]), do: {:error, "Cannot take square root of negative number: #{a}"}

  defp compute(op, ops),
    do: {:error, "Operation #{op} not supported with #{length(ops)} operands"}

  # Handle streaming completion messages
  @impl true
  def handle_info({:stream_task_completed, _context}, state) do
    {:noreply, state}
  end

  @impl true
  def handle_info({ref, :ok}, state) when is_reference(ref) do
    {:noreply, state}
  end

  @impl true
  def handle_info({ref, _result}, state) when is_reference(ref) do
    {:noreply, state}
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
    {:noreply, state}
  end

  @impl true
  def handle_info(_msg, state) do
    {:noreply, state}
  end
end
{:ok, agent} = MyAgent.start_link()
MyAgent.prompt(agent, "Hello! What can you help me with?")
MyAgent.prompt(
  agent, "Calculate what is 123123 + 08083? and what 12341 + 5331?")
MyAgent.prompt(
  agent, "Now multiply the previous two results")