Session 20: JSON-RPC and A2A Protocol
Mix.install([])
Introduction
In Session 19, you learned how controllers handle HTTP requests and produce JSON responses. Now itβs time to implement the actual A2A protocol - the language that agents use to communicate with each other.
A2A uses JSON-RPC 2.0 as its transport format. This session covers both the protocol specification and how to connect it to your OTP agents.
Sources for This Session
This session synthesizes concepts from:
Learning Goals
By the end of this session, youβll be able to:
- Understand JSON-RPC 2.0 format
- Implement A2A methods (SendMessage, GetTask)
- Connect HTTP requests to AgentServer
- Handle the task state machine
- Build a complete A2A controller
Section 1: JSON-RPC 2.0
π€ Opening Reflection
# Think about how you'd design an RPC protocol:
rpc_design = """
You want to call functions on a remote server over HTTP.
Option 1 - REST style:
POST /agents/Worker-1/tasks
Body: {"action": "search", "params": {"query": "test"}}
Option 2 - JSON-RPC style:
POST /a2a
Body: {"method": "SendMessage", "params": {"agent": "Worker-1", ...}}
What are the tradeoffs?
"""
# Your answer: ???
# Answer:
tradeoffs = %{
rest: %{
pros: [
"URLs describe resources (intuitive for CRUD)",
"HTTP methods have semantic meaning",
"Easy caching (GET requests)"
],
cons: [
"Many endpoints to discover",
"Complex URL structures",
"Harder to add new operations"
]
},
json_rpc: %{
pros: [
"Single endpoint (easy discovery)",
"Consistent request/response format",
"Easy to add new methods",
"Good for automation/agents"
],
cons: [
"Always POST (no caching)",
"Less intuitive for humans",
"Doesn't leverage HTTP semantics"
]
}
}
# A2A chose JSON-RPC because agent-to-agent communication
# prioritizes automation over human readability.
JSON-RPC Request Format
# A JSON-RPC 2.0 request MUST have:
request = %{
"jsonrpc" => "2.0", # MUST be exactly "2.0"
"method" => "SendMessage", # String - the method name
"params" => %{...}, # Object or Array - parameters
"id" => 1 # Number or String - request identifier
}
# The id is used to match requests with responses.
# If id is omitted, it's a "notification" (no response expected).
# Examples:
valid_requests = [
# Request with object params
%{
"jsonrpc" => "2.0",
"method" => "SendMessage",
"params" => %{"agent" => "Worker-1", "action" => "search"},
"id" => 1
},
# Request with array params
%{
"jsonrpc" => "2.0",
"method" => "add",
"params" => [1, 2, 3],
"id" => 2
},
# Notification (no id)
%{
"jsonrpc" => "2.0",
"method" => "log",
"params" => %{"message" => "Hello"}
}
]
π€ JSON-RPC Response Format
# Success response:
success_response = %{
"jsonrpc" => "2.0",
"result" => %{...}, # Any JSON value
"id" => 1 # Must match request id
}
# Error response:
error_response = %{
"jsonrpc" => "2.0",
"error" => %{
"code" => -32601, # Integer error code
"message" => "Method not found", # Short description
"data" => %{...} # Optional additional info
},
"id" => 1
}
# Question: Why does the response include the id?
id_purpose = """
Your answer: ???
"""
# Answer:
# Responses can arrive out of order (especially over async transports).
# The id lets the client match each response to its original request.
# Without id, you couldn't tell which result belongs to which call.
Standard Error Codes
# JSON-RPC 2.0 defines these error codes:
standard_errors = %{
-32700 => "Parse error - Invalid JSON",
-32600 => "Invalid Request - Not valid JSON-RPC",
-32601 => "Method not found",
-32602 => "Invalid params",
-32603 => "Internal error"
}
# Server errors (-32000 to -32099) are implementation-defined.
# A2A uses:
a2a_errors = %{
-32001 => "Agent not found",
-32002 => "Task not found"
}
Section 2: A2A Methods
Core A2A Methods
# A2A defines these primary methods:
a2a_methods = %{
"SendMessage" => """
Send a task to an agent.
Params: {agent, action, params}
Returns: {status: "created", task_id, ...}
""",
"GetTask" => """
Get the status of a task.
Params: {task_id}
Returns: {status, result?, error?}
""",
"CancelTask" => """
Cancel a running task.
Params: {task_id}
Returns: {status: "cancelled"}
""",
"ListTasks" => """
List tasks for an agent.
Params: {agent}
Returns: {tasks: [...]}
"""
}
# For our implementation, we'll focus on:
# - SendMessage (send task to agent)
# - GetAgentState (get agent's full state)
# - ListAgents (discover available agents)
# - StartAgent (create new agent)
# - ProcessNext (process task - for testing)
π€ Mapping to AgentServer
# Think about how each A2A method maps to AgentServer functions:
method_mapping = %{
"SendMessage" => """
A2A receives: {agent: "Worker-1", action: "search", params: {query: "test"}}
AgentServer call: AgentServer.send_task(pid, :search, %{query: "test"})
Question: How do we get the pid from the agent name?
""",
"GetAgentState" => """
A2A receives: {agent: "Worker-1"}
AgentServer call: AgentServer.get_state(pid)
Question: What parts of state should we expose via HTTP?
""",
"ProcessNext" => """
A2A receives: {agent: "Worker-1"}
AgentServer call: AgentServer.process_next(pid)
Question: Should we expose this via HTTP or just for testing?
"""
}
# Answers:
# 1. Use AgentSupervisor.whereis(name) to find pid by name
# 2. Expose: name, status, inbox_count, processed_count (not full memory)
# 3. Useful for testing; in production, agents might auto-process
Section 3: Task State Machine
Task States
# A2A tasks follow this state machine:
# ββββββββββββββββββββββββββββββββββββββββ
# β β
# βΌ β
# βββββββββββ β
# β created β ββββββββββ β
# ββββββ¬βββββ β β
# β β β
# βΌ β β
# βββββββββββ β β
# β working β ββββββββββΌββββββββββββββββ β
# ββββββ¬βββββ β β β
# β β β β
# βΌ βΌ βΌ β
# βββββββββββ βββββββββββ ββββββββββββββ
# βcompletedβ β failed β β cancelled ββ
# βββββββββββ βββββββββββ ββββββββββββββ
# β β
# ββββββββββ
task_states = %{
created: "Task received, queued for processing",
working: "Task is being processed",
completed: "Task finished successfully",
failed: "Task encountered an error",
cancelled: "Task was cancelled by request"
}
π€ Mapping to AgentServer State
# Our AgentServer doesn't explicitly track task states.
# How do we infer them?
state_inference = """
AgentServer.send_task(pid, action, params)
β Task added to inbox
β A2A status: "created"
AgentServer.process_next(pid)
β While processing: status could be "working"
β After processing: status is "completed" or "failed"
Question: Should AgentServer track task states explicitly,
or should the HTTP layer infer them?
Your answer: ???
"""
# Answer:
# For now, we keep AgentServer simple - tasks go to inbox,
# get processed, results stored in memory.
#
# The HTTP layer infers:
# - Just sent β created
# - In inbox β created (waiting)
# - After process_next β completed/failed based on result
#
# A more sophisticated implementation might track each task
# with its own state and ID for proper GetTask support.
Section 4: Implementing the A2A Controller
The JSON-RPC Parser
defmodule AgentApi.A2A.JsonRpc do
@moduledoc """
JSON-RPC 2.0 parsing and encoding.
"""
@doc """
Parse a JSON-RPC request from params.
"""
def parse(%{"jsonrpc" => "2.0", "method" => method, "params" => params, "id" => id}) do
{:ok, %{method: method, params: params, id: id}}
end
def parse(%{"jsonrpc" => "2.0", "method" => method, "id" => id}) do
{:ok, %{method: method, params: %{}, id: id}}
end
def parse(_) do
{:error, invalid_request(nil)}
end
@doc """
Build a success response.
"""
def success(result, id) do
%{jsonrpc: "2.0", result: result, id: id}
end
@doc """
Build an error response.
"""
def error(code, message, id) do
%{jsonrpc: "2.0", error: %{code: code, message: message}, id: id}
end
# Standard errors
def invalid_request(id), do: error(-32600, "Invalid Request", id)
def method_not_found(method, id), do: error(-32601, "Method not found: #{method}", id)
def invalid_params(details, id), do: error(-32602, "Invalid params: #{details}", id)
# A2A errors
def agent_not_found(name, id), do: error(-32001, "Agent not found: #{name}", id)
end
The A2A Controller
defmodule AgentApiWeb.A2AController do
use AgentApiWeb, :controller
alias AgentApi.A2A.{JsonRpc, TaskManager}
@doc """
Handle all JSON-RPC requests.
"""
def handle(conn, params) do
case JsonRpc.parse(params) do
{:ok, request} ->
response = dispatch(request)
json(conn, response)
{:error, error_response} ->
json(conn, error_response)
end
end
# Dispatch based on method
defp dispatch(%{method: method, params: params, id: id}) do
case method do
"SendMessage" -> handle_send_message(params, id)
"GetAgentState" -> handle_get_agent_state(params, id)
"ListAgents" -> handle_list_agents(id)
"StartAgent" -> handle_start_agent(params, id)
"ProcessNext" -> handle_process_next(params, id)
_ -> JsonRpc.method_not_found(method, id)
end
end
# Method handlers...
end
π€ Dispatch Pattern
# Compare this dispatch pattern to GenServer:
genserver_dispatch = """
def handle_call(:get_state, _from, state), do: {:reply, state, state}
def handle_call({:recall, key}, _from, state), do: ...
def handle_call(:process_next, _from, state), do: ...
"""
controller_dispatch = """
case method do
"SendMessage" -> handle_send_message(params, id)
"GetAgentState" -> handle_get_agent_state(params, id)
"ListAgents" -> handle_list_agents(id)
_ -> JsonRpc.method_not_found(method, id)
end
"""
# Question: Why case/do instead of function clause pattern matching?
dispatch_style = """
Your answer: ???
"""
# Answer:
# The method comes as a string from JSON, not an atom.
# We could convert to atom: String.to_existing_atom(method)
# But that risks atom exhaustion if client sends arbitrary strings.
#
# case/do with string matching is safer and explicit.
# The _ clause handles unknown methods cleanly.
Section 5: Connecting to AgentServer
The TaskManager Module
defmodule AgentApi.A2A.TaskManager do
@moduledoc """
Bridges A2A requests to AgentFramework.
"""
alias AgentFramework.{AgentServer, AgentSupervisor}
@doc """
Send a message (task) to an agent.
"""
def send_message(agent_name, action, params) do
case find_agent(agent_name) do
{:ok, pid} ->
:ok = AgentServer.send_task(pid, normalize_action(action), params)
{:ok, %{status: :created, agent: agent_name}}
:error ->
{:error, :agent_not_found}
end
end
@doc """
Get an agent's current state.
"""
def get_agent_state(agent_name) do
case find_agent(agent_name) do
{:ok, pid} ->
state = AgentServer.get_state(pid)
{:ok, state}
:error ->
{:error, :agent_not_found}
end
end
@doc """
List all available agents.
"""
def list_agents do
AgentSupervisor.list_agents()
|> Enum.map(fn pid ->
state = AgentServer.get_state(pid)
{state.name, pid}
end)
end
# Find agent by name
defp find_agent(name) do
case AgentSupervisor.whereis(name) do
{:ok, pid} -> {:ok, pid}
:error -> search_agents(name)
end
end
defp search_agents(name) do
result = Enum.find(AgentSupervisor.list_agents(), fn pid ->
try do
AgentServer.get_state(pid).name == name
catch
:exit, _ -> false
end
end)
case result do
nil -> :error
pid -> {:ok, pid}
end
end
defp normalize_action(action) when is_atom(action), do: action
defp normalize_action(action) when is_binary(action), do: String.to_atom(action)
end
π€ Why a Separate TaskManager?
# We could call AgentServer directly from the controller:
# Direct call in controller:
def handle_send_message(conn, %{"agent" => agent} = params, id) do
case AgentSupervisor.whereis(agent) do
{:ok, pid} ->
AgentServer.send_task(pid, params["action"], params["params"])
json(conn, JsonRpc.success(%{status: "created"}, id))
:error ->
json(conn, JsonRpc.agent_not_found(agent, id))
end
end
# But we use TaskManager instead. Why?
separation_benefits = """
1. Controller stays thin (just HTTP handling)
2. Business logic is in one place (TaskManager)
3. TaskManager is testable without HTTP
4. Changes to agent lookup don't touch controller
5. Could swap AgentFramework for another implementation
Your thoughts: ???
"""
Section 6: Method Implementations
SendMessage Implementation
defp handle_send_message(%{"agent" => agent, "action" => action} = params, id) do
task_params = Map.get(params, "params", %{})
case TaskManager.send_message(agent, action, task_params) do
{:ok, result} ->
JsonRpc.success(result, id)
{:error, :agent_not_found} ->
JsonRpc.agent_not_found(agent, id)
end
end
defp handle_send_message(_params, id) do
JsonRpc.invalid_params("Missing: agent, action", id)
end
GetAgentState Implementation
defp handle_get_agent_state(%{"agent" => agent}, id) do
case TaskManager.get_agent_state(agent) do
{:ok, state} ->
# Don't expose full state - summarize for HTTP
summary = %{
name: state.name,
status: state.status,
inbox_count: length(state.inbox),
processed_count: state.processed_count,
memory_keys: Map.keys(state.memory)
}
JsonRpc.success(summary, id)
{:error, :agent_not_found} ->
JsonRpc.agent_not_found(agent, id)
end
end
defp handle_get_agent_state(_params, id) do
JsonRpc.invalid_params("Missing: agent", id)
end
ListAgents Implementation
defp handle_list_agents(id) do
agents = TaskManager.list_agents()
|> Enum.map(fn {name, _pid} -> name end)
JsonRpc.success(%{agents: agents, count: length(agents)}, id)
end
Section 7: Testing the A2A Endpoint
Manual Testing with curl
# 1. Start the server
cd agent_api
iex -S mix phx.server
# 2. Test Agent Card
curl http://localhost:4000/.well-known/agent.json | jq
# 3. List agents (should be empty initially)
curl -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"ListAgents","params":{},"id":1}' | jq
# 4. Start an agent
curl -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-1"},"id":2}' | jq
# 5. Send a message
curl -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"SendMessage","params":{"agent":"Worker-1","action":"search","params":{"query":"OTP"}},"id":3}' | jq
# 6. Get agent state
curl -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"GetAgentState","params":{"agent":"Worker-1"},"id":4}' | jq
# 7. Process the task
curl -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"ProcessNext","params":{"agent":"Worker-1"},"id":5}' | jq
Automated Tests
defmodule AgentApiWeb.A2AControllerTest do
use AgentApiWeb.ConnCase
describe "POST /a2a - SendMessage" do
test "sends task to agent", %{conn: conn} do
# First start an agent
{:ok, _} = TaskManager.start_agent("Test-Worker")
conn =
conn
|> put_req_header("content-type", "application/json")
|> post("/a2a", %{
"jsonrpc" => "2.0",
"method" => "SendMessage",
"params" => %{
"agent" => "Test-Worker",
"action" => "search",
"params" => %{"query" => "test"}
},
"id" => 1
})
response = json_response(conn, 200)
assert response["result"]["status"] == "created"
end
test "returns error for missing agent", %{conn: conn} do
conn =
conn
|> put_req_header("content-type", "application/json")
|> post("/a2a", %{
"jsonrpc" => "2.0",
"method" => "SendMessage",
"params" => %{
"agent" => "NonExistent",
"action" => "search"
},
"id" => 1
})
response = json_response(conn, 200)
assert response["error"]["code"] == -32001
end
end
end
Section 8: Interactive Exercises
Exercise 1: Complete Request Trace
# Trace this complete request through the system:
request = """
POST /a2a HTTP/1.1
Content-Type: application/json
{
"jsonrpc": "2.0",
"method": "SendMessage",
"params": {
"agent": "Worker-1",
"action": "search",
"params": {"query": "Elixir"}
},
"id": 42
}
"""
# Fill in each step:
trace = """
1. Request arrives at: ???
2. Router matches: ???
3. Controller.handle receives: ???
4. JsonRpc.parse returns: ???
5. dispatch calls: ???
6. TaskManager.send_message calls: ???
7. AgentServer receives: ???
8. Response returned: ???
"""
# Answer:
trace = """
1. Request arrives at: AgentApiWeb.Endpoint
2. Router matches: POST /a2a β A2AController.handle
3. Controller.handle receives: conn, params (parsed JSON)
4. JsonRpc.parse returns: {:ok, %{method: "SendMessage", params: ..., id: 42}}
5. dispatch calls: handle_send_message(params, 42)
6. TaskManager.send_message calls: AgentServer.send_task(pid, :search, %{...})
7. AgentServer receives: GenServer.cast {:send_task, :search, %{query: "Elixir"}}
8. Response returned: {"jsonrpc":"2.0","result":{"status":"created",...},"id":42}
"""
Exercise 2: Add a New Method
# Add a "GetInbox" method that returns the agent's inbox contents
# 1. Add to dispatch:
# 2. Implement handler:
# 3. Add to TaskManager:
# Your implementation:
defp handle_get_inbox(params, id) do
# ???
end
# Answer:
defp handle_get_inbox(%{"agent" => agent}, id) do
case TaskManager.get_agent_state(agent) do
{:ok, state} ->
inbox_summary = Enum.map(state.inbox, fn msg ->
%{
id: msg.id,
action: msg.payload.action,
params: msg.payload.params
}
end)
JsonRpc.success(%{inbox: inbox_summary, count: length(inbox_summary)}, id)
{:error, :agent_not_found} ->
JsonRpc.agent_not_found(agent, id)
end
end
defp handle_get_inbox(_params, id) do
JsonRpc.invalid_params("Missing: agent", id)
end
Key Takeaways
-
JSON-RPC provides consistent structure - All requests/responses follow the same format
-
Method dispatch is like GenServer - Pattern match on method name, delegate to handlers
-
TaskManager bridges layers - Keeps controller thin, makes logic testable
-
Error codes are standardized - Use JSON-RPC codes for protocol errors, custom for app errors
-
Task states track progress - created β working β completed/failed/cancelled
-
Donβt expose everything - Summarize internal state for HTTP responses
Whatβs Next?
In the next session, weβll bring everything together in the Checkpoint Project:
- Complete Phoenix A2A integration
- Full testing with curl and automated tests
- Verification checklist
- Preview of Phase 5 (distribution)
Youβll have a working HTTP API for your agent framework!