Req LLM with ReqCassette
Mix.install([
{:req_llm, "~> 1.0.0-rc.7"},
{:req_cassette, path: Path.join(__DIR__, "..")}
])
Setup
# Set your Anthropic API key
# System.put_env("ANTHROPIC_API_KEY", "sk-...")
#System.put_env("ANTHROPIC_API_KEY", System.get_env("LB_ANTHROPIC_API_KEY"))
ReqLLM Basic Usage (without cassettes)
# Keys are picked up from .env files or environment variables - see `ReqLLM.Keys`
model = "anthropic:claude-sonnet-4-20250514"
ReqLLM.generate_text!(model, "Hello world")
#=> "Hello! How can I assist you today?"
# Streaming text generation
{:ok, response} = ReqLLM.stream_text(model, "Write a short story")
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()
# Access usage metadata after streaming
usage = ReqLLM.StreamResponse.usage(response)
ReqCassette: Record and Replay LLM Calls
The following sections demonstrate how to use ReqCassette to record LLM API calls and replay them without hitting the API (saving cost and time).
Setup Cassette Directory
cassette_dir = Path.join(__DIR__, "cassettes") |> dbg
File.rm_rf!(cassette_dir)
File.mkdir_p!(cassette_dir)
File.ls!(cassette_dir)
Using ReqCassette with ReqLLM
model = "anthropic:claude-sonnet-4-20250514"
prompt = "Write a short poem about Elixir programming language"
# First call - will record to cassette (costs money)
{:ok, response1} =
ReqLLM.generate_text(
model,
prompt,
max_tokens: 100,
req_http_options: [
plug:
{ReqCassette.Plug, %{
cassette_dir: cassette_dir,
cassette_name: "test1",
mode: :record,
filter_request_headers: ["authorization", "x-api-key", "cookie"]
}}
]
)
text1 = ReqLLM.Response.text(response1)
IO.puts("First call:\n#{text1}\n")
# Second call - will replay from cassette (FREE! No API call!)
{:ok, response2} =
ReqLLM.generate_text(
model,
prompt,
max_tokens: 100,
req_http_options: [
plug:
{ReqCassette.Plug, %{
cassette_dir: cassette_dir,
cassette_name: "test1",
mode: :replay,
filter_request_headers: ["authorization", "x-api-key", "cookie"]
}}
]
)
text2 = ReqLLM.Response.text(response2)
IO.puts("Second call (from cassette):\n#{text2}\n")
# Verify they're identical
if text1 == text2 do
IO.puts("โ
Both responses are identical - cassette replay worked!")
else
IO.puts("โ Responses differ - something went wrong")
end
View Cassette Files
# List all cassette files
cassettes = File.ls!(cassette_dir)
IO.puts("Cassettes created: #{length(cassettes)}")
AI Agent with Tool Calling
MyAgentWithCassettes - Cassette-Compatible Agent
This agent uses non-streaming responses so it can work with ReqCassette for record/replay.
defmodule MyAgentWithCassettes do
@moduledoc """
A GenServer-based AI agent that supports ReqCassette for recording and replaying LLM calls.
Uses non-streaming responses to enable cassette support. All LLM calls are recorded
to cassettes and can be replayed without making actual API calls.
## Usage
# Start the agent with cassette support
{:ok, agent} = MyAgentWithCassettes.start_link(
cassette_opts: [
cassette_name: "my_agent",
cassette_dir: "cassettes",
mode: :record
]
)
# Send a prompt (first call records, subsequent calls replay)
MyAgentWithCassettes.prompt(agent, "What's 15 * 7?")
## Features
- Non-streaming text generation (enables cassette support)
- Tool calling with proper argument parsing
- Conversation history maintenance
- Automatic cassette recording and replay
- Calculator and web search tools included
"""
use GenServer
alias ReqLLM.{Context, Tool, Response, ToolCall}
defstruct [:history, :tools, :model, :req_http_options]
@default_model "anthropic:claude-sonnet-4-20250514"
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts)
end
def prompt(pid, message) when is_binary(message) do
GenServer.call(pid, {:prompt, message}, 30_000)
end
@impl true
def init(opts) do
system_prompt =
Keyword.get(opts, :system_prompt, """
You are a helpful AI assistant with access to tools.
When you need to compute math, use the calculator tool with the expression parameter.
Do not wrap arguments in code fences. Do not include extra text in arguments.
When you need to search for information, use the web_search tool with a relevant query.
Always use tools when appropriate and provide clear, helpful responses.
""")
model = Keyword.get(opts, :model, @default_model)
tools = setup_tools()
# Setup cassette configuration
req_http_options =
case Keyword.get(opts, :cassette_opts) do
nil ->
[]
cassette_opts ->
[plug: {ReqCassette.Plug, Map.new(cassette_opts)}]
end
history = Context.new([Context.system(system_prompt)])
{:ok,
%__MODULE__{
history: history,
tools: tools,
model: model,
req_http_options: req_http_options
}}
end
@impl true
def handle_call({:prompt, message}, _from, state) do
new_history = Context.append(state.history, Context.user(message))
case generate_with_tools(state.model, new_history, state.tools, state.req_http_options) do
{:ok, final_history, final_response} ->
{:reply, {:ok, final_response}, %{state | history: final_history}}
{:error, error} ->
IO.puts("Error: #{inspect(error)}")
{:reply, {:error, error}, state}
end
end
defp generate_with_tools(model, history, tools, req_http_options) do
# Make initial request with tools
case ReqLLM.generate_text(
model,
history.messages,
tools: tools,
max_tokens: 1024,
req_http_options: req_http_options
) do
{:ok, response} ->
text = Response.text(response)
IO.puts("Assistant: #{text}")
# Check if any tools were called
tool_calls = extract_tool_calls(response)
if tool_calls == [] do
# No tools called, we're done
final_history = Context.append(history, Context.assistant(text))
{:ok, final_history, text}
else
# Tools were called, execute them and make follow-up request
IO.puts("\n๐ง Executing tools...")
assistant_message = Context.assistant(text, tool_calls: tool_calls)
history_with_tool_call = Context.append(history, assistant_message)
# Execute tools and collect results as tool result messages
tool_result_messages =
Enum.map(tool_calls, fn tool_call ->
tool = Enum.find(tools, fn t -> t.name == tool_call.name end)
if tool do
case ReqLLM.Tool.execute(tool, tool_call.arguments) do
{:ok, result} ->
IO.puts(
" #{tool_call.name}(#{inspect(tool_call.arguments)}) โ #{inspect(result)}"
)
result_str = if is_binary(result), do: result, else: Jason.encode!(result)
Context.tool_result(tool_call.id, tool_call.name, result_str)
{:error, error} ->
IO.puts(" โ #{tool_call.name}: #{inspect(error)}")
error_result = %{error: "Tool execution failed: #{inspect(error)}"}
Context.tool_result(tool_call.id, tool_call.name, Jason.encode!(error_result))
end
else
IO.puts(" โ Tool #{tool_call.name} not found")
error_result = %{error: "Tool not found"}
Context.tool_result(tool_call.id, tool_call.name, Jason.encode!(error_result))
end
end)
# Append tool result messages
history_with_results = Context.append(history_with_tool_call, tool_result_messages)
# Make follow-up request with tool results
IO.puts("\n")
case ReqLLM.generate_text(
model,
history_with_results.messages,
max_tokens: 1024,
req_http_options: req_http_options
) do
{:ok, final_response} ->
final_text = Response.text(final_response)
IO.puts("Assistant: #{final_text}")
final_history = Context.append(history_with_results, Context.assistant(final_text))
{:ok, final_history, final_text}
{:error, error} ->
{:error, error}
end
end
{:error, error} ->
{:error, error}
end
end
defp extract_tool_calls(%Response{} = response) do
# Extract tool calls from the response message
case response.message do
%{tool_calls: tool_calls} when is_list(tool_calls) and length(tool_calls) > 0 ->
Enum.map(tool_calls, fn tool_call ->
%{
id: tool_call.id,
name: ToolCall.name(tool_call),
arguments: ToolCall.args_map(tool_call) || %{}
}
end)
_ ->
[]
end
end
defp setup_tools do
[
Tool.new!(
name: "calculator",
description: "Perform mathematical calculations. Pass an expression string.",
parameter_schema: [
expression: [
type: :string,
required: true,
doc: "Mathematical expression to evaluate. Examples: '15 * 7', '10 + 5', 'sqrt(16)'"
]
],
callback: &calculator_callback/1
),
Tool.new!(
name: "web_search",
description: "Search the web for information",
parameter_schema: [
query: [type: :string, required: true, doc: "Search query"]
],
callback: fn %{"query" => query} ->
{:ok, "Mock search results for: #{query}"}
end
)
]
end
defp calculator_callback(%{"expression" => expr}) when is_binary(expr) do
{result, _} = Code.eval_string(expr)
{:ok, result}
rescue
e -> {:error, "Invalid expression: #{Exception.message(e)}"}
end
defp calculator_callback(%{expression: expr}) when is_binary(expr) do
{result, _} = Code.eval_string(expr)
{:ok, result}
rescue
e -> {:error, "Invalid expression: #{Exception.message(e)}"}
end
defp calculator_callback(args) do
{:error, "Provide an expression string. Example: {\"expression\":\"15 * 7\"}. Got: #{inspect(args)}"}
end
end
Using MyAgentWithCassettes
# Create cassette directory for agent
agent_cassette_dir = Path.join(__DIR__, "agent_cassettes")
# Clean up old cassettes for fresh testing
File.rm_rf!(agent_cassette_dir)
File.mkdir_p!(agent_cassette_dir)
agent_cassettes = File.ls!(agent_cassette_dir)
IO.puts("Agent cassettes created: #{length(agent_cassettes)}")
# Start agent with cassette support
{:ok, cassette_agent_1} =
MyAgentWithCassettes.start_link(
cassette_opts: [
cassette_dir: agent_cassette_dir,
cassette_name: "agent",
mode: :record,
filter_request_headers: ["authorization", "x-api-key", "cookie"]
]
)
# Start agent with cassette support
{:ok, cassette_agent_2} =
MyAgentWithCassettes.start_link(
cassette_opts: [
cassette_dir: agent_cassette_dir,
cassette_name: "agent",
mode: :record,
filter_request_headers: ["authorization", "x-api-key", "cookie"]
]
)
IO.puts("โ
Agent started with cassette support (cassettes cleaned)")
# First call - will record to cassette (costs money)
MyAgentWithCassettes.prompt(cassette_agent_1, "What is 15 * 7?")
# Second identical call - will replay from cassette (FREE!)
MyAgentWithCassettes.prompt(cassette_agent_2, "What is 15 * 7?")
# First call - will record to cassette (costs money)
MyAgentWithCassettes.prompt(cassette_agent_1, "Make a joke with the result of 35 - 11?")
# Second identical call - will replay from cassette (FREE!)
MyAgentWithCassettes.prompt(cassette_agent_2, "Make a joke with the result of 35 - 11?")
# Verify cassettes were created
agent_cassettes = File.ls!(agent_cassette_dir)
IO.puts("Agent cassettes created: #{length(agent_cassettes)}")
# Each tool execution and follow-up creates separate cassettes
Enum.each(agent_cassettes, fn cassette ->
IO.puts(" - #{cassette}")
cassette_data = File.read!(Path.join(agent_cassette_dir, cassette)) |> Jason.decode!()
IO.puts("Number of records is #{length(cassette_data["interactions"])}")
end)
MyAgent - Streaming Version (No Cassette Support)
This is the original streaming agent. It provides real-time output but cannot use cassettes.
defmodule MyAgent do
@moduledoc """
A GenServer-based AI agent that uses ReqLLM for streaming text generation with tool calling.
This agent provides a conversation interface with maintained history and supports
function calling capabilities with Claude 3.5's streaming format.
## Usage
# Start the agent
{:ok, agent} = MyAgent.start_link()
# Send a prompt
MyAgent.prompt(agent, "What's 15 * 7?")
# Agent streams response to stdout and returns final text
#=> {:ok, "15 * 7 = 105"}
## Features
- Streaming text generation with real-time output
- Tool calling with proper argument parsing from Claude 3.5
- Conversation history maintenance
- Two-step completion for tool usage scenarios
- Calculator and web search tools included
"""
use GenServer
alias ReqLLM.{Context, Tool}
defstruct [:history, :tools, :model]
@default_model "anthropic:claude-sonnet-4-20250514"
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts)
end
def prompt(pid, message) when is_binary(message) do
GenServer.call(pid, {:prompt, message}, 30_000)
end
def prompt(pid, model, message) when is_binary(model) and is_binary(message) do
GenServer.call(pid, {:prompt, model, message}, 30_000)
end
@impl true
def init(opts) do
system_prompt =
Keyword.get(opts, :system_prompt, """
You are a helpful AI assistant with access to tools.
When you need to compute math, use the calculator tool with the expression parameter.
Do not wrap arguments in code fences. Do not include extra text in arguments.
When you need to search for information, use the web_search tool with a relevant query.
Always use tools when appropriate and provide clear, helpful responses.
""")
model = Keyword.get(opts, :model, @default_model)
tools = setup_tools()
history = Context.new([Context.system(system_prompt)])
{:ok, %__MODULE__{history: history, tools: tools, model: model}}
end
@impl true
def handle_call({:prompt, message}, from, %{model: model} = state) do
handle_call({:prompt, model, message}, from, state)
end
@impl true
def handle_call({:prompt, model, message}, _from, state) do
new_history = Context.append(state.history, Context.user(message))
case stream_and_handle_tools(model, new_history, state.tools) do
{:ok, final_history, final_response} ->
IO.write("\n")
{:reply, {:ok, final_response}, %{state | history: final_history}}
{:error, error} ->
IO.write("Error: #{inspect(error)}\n")
{:reply, {:error, error}, state}
end
end
defp stream_and_handle_tools(model, history, tools) do
case ReqLLM.stream_text(model, history.messages, tools: tools) do
{:ok, stream_response} ->
# Stream chunks to console in real-time and collect for processing
chunks =
stream_response.stream
|> Enum.map(fn chunk ->
# Stream to console immediately
IO.write(chunk.text)
chunk
end)
case extract_tool_calls_from_chunks(chunks) do
[] ->
text = chunks |> Enum.map_join("", & &1.text)
final_history = Context.append(history, Context.assistant(text))
{:ok, final_history, text}
tool_calls ->
initial_text = chunks |> Enum.map_join("", & &1.text)
assistant_message = Context.assistant(initial_text, tool_calls: tool_calls)
history_with_tool_call = Context.append(history, assistant_message)
# Execute tools and show results
IO.write("\n")
history_with_results =
Enum.reduce(tool_calls, history_with_tool_call, fn tool_call, ctx ->
# Find the tool
tool = Enum.find(tools, fn t -> t.name == tool_call.name end)
if tool do
case ReqLLM.Tool.execute(tool, tool_call.arguments) do
{:ok, result} ->
IO.write(
"๐ง #{tool_call.name}(#{inspect(tool_call.arguments)}) โ #{inspect(result)}\n"
)
tool_result_msg =
Context.tool_result_message(tool_call.name, tool_call.id, result)
Context.append(ctx, tool_result_msg)
{:error, error} ->
IO.write("โ #{tool_call.name}: #{inspect(error)}\n")
error_result = %{error: "Tool execution failed"}
tool_result_msg =
Context.tool_result_message(tool_call.name, tool_call.id, error_result)
Context.append(ctx, tool_result_msg)
end
else
IO.write("โ Tool #{tool_call.name} not found\n")
ctx
end
end)
case ReqLLM.stream_text(model, history_with_results.messages) do
{:ok, final_stream_response} ->
IO.write("\n")
# Stream final response to console in real-time
final_chunks =
final_stream_response.stream
|> Enum.map(fn chunk ->
# Stream to console immediately
IO.write(chunk.text)
chunk
end)
final_text = final_chunks |> Enum.map_join("", & &1.text)
final_history =
Context.append(history_with_results, Context.assistant(final_text))
{:ok, final_history, final_text}
{:error, error} ->
{:error, error}
end
end
{:error, error} ->
{:error, error}
end
end
defp extract_tool_calls_from_chunks(chunks) do
# Base tool calls with index
tool_calls =
chunks
|> Enum.filter(&(&1.type == :tool_call))
|> Enum.map(fn chunk ->
%{
id: Map.get(chunk.metadata, :id) || "call_#{:erlang.unique_integer()}",
name: chunk.name,
arguments: chunk.arguments || %{},
index: Map.get(chunk.metadata, :index, 0)
}
end)
# Collect argument fragments from meta chunks
arg_fragments =
chunks
|> Enum.filter(&(&1.type == :meta))
|> Enum.filter(&Map.has_key?(&1.metadata, :tool_call_args))
|> Enum.group_by(& &1.metadata.tool_call_args.index)
|> Map.new(fn {index, fragments} ->
json = fragments |> Enum.map_join("", & &1.metadata.tool_call_args.fragment)
{index, json}
end)
# Merge accumulated JSON back into tool calls
tool_calls
|> Enum.map(fn call ->
case Map.get(arg_fragments, call.index) do
nil ->
Map.delete(call, :index)
json ->
case Jason.decode(json) do
{:ok, args} -> call |> Map.put(:arguments, args) |> Map.delete(:index)
# keep empty args if invalid JSON
{:error, _} -> Map.delete(call, :index)
end
end
end)
end
defp setup_tools do
[
Tool.new!(
name: "calculator",
description:
"Perform mathematical calculations. Prefer structured arguments: " <>
~s|{"operation":"multiply","operands":[15,7]}| <>
". As a fallback, you may pass an expression string: " <>
~s|{"expression":"15 * 7 + 23"}| <>
". Valid operations: add, subtract, multiply, divide, power, sqrt.",
parameter_schema: [
operation: [
type: :string,
required: false,
doc: "One of: add, subtract, multiply, divide, power, sqrt"
],
operands: [
type: {:list, :any},
required: false,
doc: "Numbers to operate on. For sqrt, pass a single number; for others, pass 2+."
],
expression: [
type: :string,
required: false,
doc: "Optional fallback. Examples: '15 * 7 + 23', '10 * 5', 'sqrt(16)'."
]
],
callback: &calculator_callback/1
),
Tool.new!(
name: "web_search",
description: "Search the web for information",
parameter_schema: [
query: [type: :string, required: true, doc: "Search query"]
],
callback: fn %{"query" => query} ->
{:ok, "Mock search results for: #{query}"}
end
)
]
end
defp calculator_callback(%{expression: expr}) when is_binary(expr) do
{result, _} = Code.eval_string(expr)
{:ok, result}
rescue
e -> {:error, "Invalid expression: #{Exception.message(e)}"}
end
defp calculator_callback(%{operation: op, operands: ops}) when is_list(ops) do
with :ok <- validate_operation(op),
{:ok, nums} <- cast_numbers(ops) do
compute(op, nums)
end
end
defp calculator_callback(%{"expression" => expr}) when is_binary(expr) do
calculator_callback(%{expression: expr})
end
defp calculator_callback(%{"operation" => op, "operands" => ops}) when is_list(ops) do
calculator_callback(%{operation: op, operands: ops})
end
defp calculator_callback(args) do
{:error,
"Provide either {operation, operands} or {expression}. Examples: " <>
~s|{"operation":"multiply","operands":[15,7]}| <>
" or " <>
~s|{"expression":"15 * 7 + 23"}| <> ". Got: #{inspect(args)}"}
end
defp validate_operation(op)
when op in ["add", "subtract", "multiply", "divide", "power", "sqrt"],
do: :ok
defp validate_operation(op),
do: {:error, "Invalid operation: #{op}. Valid: add, subtract, multiply, divide, power, sqrt"}
defp cast_numbers(ops) do
nums =
Enum.map(ops, fn
n when is_integer(n) -> n * 1.0
n when is_float(n) -> n
s when is_binary(s) -> String.to_float(s)
end)
{:ok, nums}
rescue
_ -> {:error, "All operands must be numbers"}
end
defp compute("add", nums), do: {:ok, Enum.sum(nums)}
defp compute("subtract", [a, b]), do: {:ok, a - b}
defp compute("multiply", nums), do: {:ok, Enum.reduce(nums, 1, &(&1 * &2))}
defp compute("divide", [a, b]) when b != 0, do: {:ok, a / b}
defp compute("divide", [_, 0]), do: {:error, "Division by zero"}
defp compute("power", [a, b]), do: {:ok, :math.pow(a, b)}
defp compute("sqrt", [a]) when a >= 0, do: {:ok, :math.sqrt(a)}
defp compute("sqrt", [a]), do: {:error, "Cannot take square root of negative number: #{a}"}
defp compute(op, ops),
do: {:error, "Operation #{op} not supported with #{length(ops)} operands"}
# Handle streaming completion messages
@impl true
def handle_info({:stream_task_completed, _context}, state) do
{:noreply, state}
end
@impl true
def handle_info({ref, :ok}, state) when is_reference(ref) do
{:noreply, state}
end
@impl true
def handle_info({ref, _result}, state) when is_reference(ref) do
{:noreply, state}
end
@impl true
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
{:noreply, state}
end
@impl true
def handle_info(_msg, state) do
{:noreply, state}
end
end
{:ok, agent} = MyAgent.start_link()
MyAgent.prompt(agent, "Hello! What can you help me with?")
MyAgent.prompt(
agent, "Calculate what is 123123 + 08083? and what 12341 + 5331?")
MyAgent.prompt(
agent, "Now multiply the previous two results")