Powered by AppSignal & Oban Pro

Session 20: JSON-RPC and A2A Protocol

notebooks/20_json_rpc_a2a.livemd

Session 20: JSON-RPC and A2A Protocol

Mix.install([])

Introduction

In Session 19, you learned how controllers handle HTTP requests and produce JSON responses. Now it’s time to implement the actual A2A protocol - the language that agents use to communicate with each other.

A2A uses JSON-RPC 2.0 as its transport format. This session covers both the protocol specification and how to connect it to your OTP agents.

Sources for This Session

This session synthesizes concepts from:

Learning Goals

By the end of this session, you’ll be able to:

  • Understand JSON-RPC 2.0 format
  • Implement A2A methods (SendMessage, GetTask)
  • Connect HTTP requests to AgentServer
  • Handle the task state machine
  • Build a complete A2A controller

Section 1: JSON-RPC 2.0

πŸ€” Opening Reflection

# Think about how you'd design an RPC protocol:

rpc_design = """
You want to call functions on a remote server over HTTP.

Option 1 - REST style:
  POST /agents/Worker-1/tasks
  Body: {"action": "search", "params": {"query": "test"}}

Option 2 - JSON-RPC style:
  POST /a2a
  Body: {"method": "SendMessage", "params": {"agent": "Worker-1", ...}}

What are the tradeoffs?
"""

# Your answer: ???

# Answer:
tradeoffs = %{
  rest: %{
    pros: [
      "URLs describe resources (intuitive for CRUD)",
      "HTTP methods have semantic meaning",
      "Easy caching (GET requests)"
    ],
    cons: [
      "Many endpoints to discover",
      "Complex URL structures",
      "Harder to add new operations"
    ]
  },
  json_rpc: %{
    pros: [
      "Single endpoint (easy discovery)",
      "Consistent request/response format",
      "Easy to add new methods",
      "Good for automation/agents"
    ],
    cons: [
      "Always POST (no caching)",
      "Less intuitive for humans",
      "Doesn't leverage HTTP semantics"
    ]
  }
}

# A2A chose JSON-RPC because agent-to-agent communication
# prioritizes automation over human readability.

JSON-RPC Request Format

# A JSON-RPC 2.0 request MUST have:
request = %{
  "jsonrpc" => "2.0",          # MUST be exactly "2.0"
  "method" => "SendMessage",   # String - the method name
  "params" => %{...},          # Object or Array - parameters
  "id" => 1                    # Number or String - request identifier
}

# The id is used to match requests with responses.
# If id is omitted, it's a "notification" (no response expected).

# Examples:
valid_requests = [
  # Request with object params
  %{
    "jsonrpc" => "2.0",
    "method" => "SendMessage",
    "params" => %{"agent" => "Worker-1", "action" => "search"},
    "id" => 1
  },

  # Request with array params
  %{
    "jsonrpc" => "2.0",
    "method" => "add",
    "params" => [1, 2, 3],
    "id" => 2
  },

  # Notification (no id)
  %{
    "jsonrpc" => "2.0",
    "method" => "log",
    "params" => %{"message" => "Hello"}
  }
]

πŸ€” JSON-RPC Response Format

# Success response:
success_response = %{
  "jsonrpc" => "2.0",
  "result" => %{...},    # Any JSON value
  "id" => 1              # Must match request id
}

# Error response:
error_response = %{
  "jsonrpc" => "2.0",
  "error" => %{
    "code" => -32601,           # Integer error code
    "message" => "Method not found",  # Short description
    "data" => %{...}            # Optional additional info
  },
  "id" => 1
}

# Question: Why does the response include the id?

id_purpose = """
Your answer: ???
"""

# Answer:
# Responses can arrive out of order (especially over async transports).
# The id lets the client match each response to its original request.
# Without id, you couldn't tell which result belongs to which call.

Standard Error Codes

# JSON-RPC 2.0 defines these error codes:

standard_errors = %{
  -32700 => "Parse error - Invalid JSON",
  -32600 => "Invalid Request - Not valid JSON-RPC",
  -32601 => "Method not found",
  -32602 => "Invalid params",
  -32603 => "Internal error"
}

# Server errors (-32000 to -32099) are implementation-defined.
# A2A uses:
a2a_errors = %{
  -32001 => "Agent not found",
  -32002 => "Task not found"
}

Section 2: A2A Methods

Core A2A Methods

# A2A defines these primary methods:

a2a_methods = %{
  "SendMessage" => """
    Send a task to an agent.
    Params: {agent, action, params}
    Returns: {status: "created", task_id, ...}
  """,

  "GetTask" => """
    Get the status of a task.
    Params: {task_id}
    Returns: {status, result?, error?}
  """,

  "CancelTask" => """
    Cancel a running task.
    Params: {task_id}
    Returns: {status: "cancelled"}
  """,

  "ListTasks" => """
    List tasks for an agent.
    Params: {agent}
    Returns: {tasks: [...]}
  """
}

# For our implementation, we'll focus on:
# - SendMessage (send task to agent)
# - GetAgentState (get agent's full state)
# - ListAgents (discover available agents)
# - StartAgent (create new agent)
# - ProcessNext (process task - for testing)

πŸ€” Mapping to AgentServer

# Think about how each A2A method maps to AgentServer functions:

method_mapping = %{
  "SendMessage" => """
    A2A receives: {agent: "Worker-1", action: "search", params: {query: "test"}}
    AgentServer call: AgentServer.send_task(pid, :search, %{query: "test"})

    Question: How do we get the pid from the agent name?
  """,

  "GetAgentState" => """
    A2A receives: {agent: "Worker-1"}
    AgentServer call: AgentServer.get_state(pid)

    Question: What parts of state should we expose via HTTP?
  """,

  "ProcessNext" => """
    A2A receives: {agent: "Worker-1"}
    AgentServer call: AgentServer.process_next(pid)

    Question: Should we expose this via HTTP or just for testing?
  """
}

# Answers:
# 1. Use AgentSupervisor.whereis(name) to find pid by name
# 2. Expose: name, status, inbox_count, processed_count (not full memory)
# 3. Useful for testing; in production, agents might auto-process

Section 3: Task State Machine

Task States

# A2A tasks follow this state machine:

#          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
#          β”‚                                      β”‚
#          β–Ό                                      β”‚
#     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                 β”‚
#     β”‚ created β”‚ ─────────┐                      β”‚
#     β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜          β”‚                      β”‚
#          β”‚               β”‚                      β”‚
#          β–Ό               β”‚                      β”‚
#     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚                      β”‚
#     β”‚ working β”‚ ─────────┼───────────────┐      β”‚
#     β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜          β”‚               β”‚      β”‚
#          β”‚               β”‚               β”‚      β”‚
#          β–Ό               β–Ό               β–Ό      β”‚
#     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
#     β”‚completedβ”‚    β”‚ failed  β”‚    β”‚ cancelled β”‚β”‚
#     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
#                                        β”‚        β”‚
#                                        β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜

task_states = %{
  created: "Task received, queued for processing",
  working: "Task is being processed",
  completed: "Task finished successfully",
  failed: "Task encountered an error",
  cancelled: "Task was cancelled by request"
}

πŸ€” Mapping to AgentServer State

# Our AgentServer doesn't explicitly track task states.
# How do we infer them?

state_inference = """
AgentServer.send_task(pid, action, params)
  β†’ Task added to inbox
  β†’ A2A status: "created"

AgentServer.process_next(pid)
  β†’ While processing: status could be "working"
  β†’ After processing: status is "completed" or "failed"

Question: Should AgentServer track task states explicitly,
          or should the HTTP layer infer them?

Your answer: ???
"""

# Answer:
# For now, we keep AgentServer simple - tasks go to inbox,
# get processed, results stored in memory.
#
# The HTTP layer infers:
# - Just sent β†’ created
# - In inbox β†’ created (waiting)
# - After process_next β†’ completed/failed based on result
#
# A more sophisticated implementation might track each task
# with its own state and ID for proper GetTask support.

Section 4: Implementing the A2A Controller

The JSON-RPC Parser

defmodule AgentApi.A2A.JsonRpc do
  @moduledoc """
  JSON-RPC 2.0 parsing and encoding.
  """

  @doc """
  Parse a JSON-RPC request from params.
  """
  def parse(%{"jsonrpc" => "2.0", "method" => method, "params" => params, "id" => id}) do
    {:ok, %{method: method, params: params, id: id}}
  end

  def parse(%{"jsonrpc" => "2.0", "method" => method, "id" => id}) do
    {:ok, %{method: method, params: %{}, id: id}}
  end

  def parse(_) do
    {:error, invalid_request(nil)}
  end

  @doc """
  Build a success response.
  """
  def success(result, id) do
    %{jsonrpc: "2.0", result: result, id: id}
  end

  @doc """
  Build an error response.
  """
  def error(code, message, id) do
    %{jsonrpc: "2.0", error: %{code: code, message: message}, id: id}
  end

  # Standard errors
  def invalid_request(id), do: error(-32600, "Invalid Request", id)
  def method_not_found(method, id), do: error(-32601, "Method not found: #{method}", id)
  def invalid_params(details, id), do: error(-32602, "Invalid params: #{details}", id)

  # A2A errors
  def agent_not_found(name, id), do: error(-32001, "Agent not found: #{name}", id)
end

The A2A Controller

defmodule AgentApiWeb.A2AController do
  use AgentApiWeb, :controller

  alias AgentApi.A2A.{JsonRpc, TaskManager}

  @doc """
  Handle all JSON-RPC requests.
  """
  def handle(conn, params) do
    case JsonRpc.parse(params) do
      {:ok, request} ->
        response = dispatch(request)
        json(conn, response)

      {:error, error_response} ->
        json(conn, error_response)
    end
  end

  # Dispatch based on method
  defp dispatch(%{method: method, params: params, id: id}) do
    case method do
      "SendMessage" -> handle_send_message(params, id)
      "GetAgentState" -> handle_get_agent_state(params, id)
      "ListAgents" -> handle_list_agents(id)
      "StartAgent" -> handle_start_agent(params, id)
      "ProcessNext" -> handle_process_next(params, id)
      _ -> JsonRpc.method_not_found(method, id)
    end
  end

  # Method handlers...
end

πŸ€” Dispatch Pattern

# Compare this dispatch pattern to GenServer:

genserver_dispatch = """
def handle_call(:get_state, _from, state), do: {:reply, state, state}
def handle_call({:recall, key}, _from, state), do: ...
def handle_call(:process_next, _from, state), do: ...
"""

controller_dispatch = """
case method do
  "SendMessage" -> handle_send_message(params, id)
  "GetAgentState" -> handle_get_agent_state(params, id)
  "ListAgents" -> handle_list_agents(id)
  _ -> JsonRpc.method_not_found(method, id)
end
"""

# Question: Why case/do instead of function clause pattern matching?

dispatch_style = """
Your answer: ???
"""

# Answer:
# The method comes as a string from JSON, not an atom.
# We could convert to atom: String.to_existing_atom(method)
# But that risks atom exhaustion if client sends arbitrary strings.
#
# case/do with string matching is safer and explicit.
# The _ clause handles unknown methods cleanly.

Section 5: Connecting to AgentServer

The TaskManager Module

defmodule AgentApi.A2A.TaskManager do
  @moduledoc """
  Bridges A2A requests to AgentFramework.
  """

  alias AgentFramework.{AgentServer, AgentSupervisor}

  @doc """
  Send a message (task) to an agent.
  """
  def send_message(agent_name, action, params) do
    case find_agent(agent_name) do
      {:ok, pid} ->
        :ok = AgentServer.send_task(pid, normalize_action(action), params)
        {:ok, %{status: :created, agent: agent_name}}

      :error ->
        {:error, :agent_not_found}
    end
  end

  @doc """
  Get an agent's current state.
  """
  def get_agent_state(agent_name) do
    case find_agent(agent_name) do
      {:ok, pid} ->
        state = AgentServer.get_state(pid)
        {:ok, state}

      :error ->
        {:error, :agent_not_found}
    end
  end

  @doc """
  List all available agents.
  """
  def list_agents do
    AgentSupervisor.list_agents()
    |> Enum.map(fn pid ->
      state = AgentServer.get_state(pid)
      {state.name, pid}
    end)
  end

  # Find agent by name
  defp find_agent(name) do
    case AgentSupervisor.whereis(name) do
      {:ok, pid} -> {:ok, pid}
      :error -> search_agents(name)
    end
  end

  defp search_agents(name) do
    result = Enum.find(AgentSupervisor.list_agents(), fn pid ->
      try do
        AgentServer.get_state(pid).name == name
      catch
        :exit, _ -> false
      end
    end)

    case result do
      nil -> :error
      pid -> {:ok, pid}
    end
  end

  defp normalize_action(action) when is_atom(action), do: action
  defp normalize_action(action) when is_binary(action), do: String.to_atom(action)
end

πŸ€” Why a Separate TaskManager?

# We could call AgentServer directly from the controller:

# Direct call in controller:
def handle_send_message(conn, %{"agent" => agent} = params, id) do
  case AgentSupervisor.whereis(agent) do
    {:ok, pid} ->
      AgentServer.send_task(pid, params["action"], params["params"])
      json(conn, JsonRpc.success(%{status: "created"}, id))
    :error ->
      json(conn, JsonRpc.agent_not_found(agent, id))
  end
end

# But we use TaskManager instead. Why?

separation_benefits = """
1. Controller stays thin (just HTTP handling)
2. Business logic is in one place (TaskManager)
3. TaskManager is testable without HTTP
4. Changes to agent lookup don't touch controller
5. Could swap AgentFramework for another implementation

Your thoughts: ???
"""

Section 6: Method Implementations

SendMessage Implementation

defp handle_send_message(%{"agent" => agent, "action" => action} = params, id) do
  task_params = Map.get(params, "params", %{})

  case TaskManager.send_message(agent, action, task_params) do
    {:ok, result} ->
      JsonRpc.success(result, id)

    {:error, :agent_not_found} ->
      JsonRpc.agent_not_found(agent, id)
  end
end

defp handle_send_message(_params, id) do
  JsonRpc.invalid_params("Missing: agent, action", id)
end

GetAgentState Implementation

defp handle_get_agent_state(%{"agent" => agent}, id) do
  case TaskManager.get_agent_state(agent) do
    {:ok, state} ->
      # Don't expose full state - summarize for HTTP
      summary = %{
        name: state.name,
        status: state.status,
        inbox_count: length(state.inbox),
        processed_count: state.processed_count,
        memory_keys: Map.keys(state.memory)
      }
      JsonRpc.success(summary, id)

    {:error, :agent_not_found} ->
      JsonRpc.agent_not_found(agent, id)
  end
end

defp handle_get_agent_state(_params, id) do
  JsonRpc.invalid_params("Missing: agent", id)
end

ListAgents Implementation

defp handle_list_agents(id) do
  agents = TaskManager.list_agents()
           |> Enum.map(fn {name, _pid} -> name end)

  JsonRpc.success(%{agents: agents, count: length(agents)}, id)
end

Section 7: Testing the A2A Endpoint

Manual Testing with curl

# 1. Start the server
cd agent_api
iex -S mix phx.server

# 2. Test Agent Card
curl http://localhost:4000/.well-known/agent.json | jq

# 3. List agents (should be empty initially)
curl -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"ListAgents","params":{},"id":1}' | jq

# 4. Start an agent
curl -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-1"},"id":2}' | jq

# 5. Send a message
curl -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"SendMessage","params":{"agent":"Worker-1","action":"search","params":{"query":"OTP"}},"id":3}' | jq

# 6. Get agent state
curl -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"GetAgentState","params":{"agent":"Worker-1"},"id":4}' | jq

# 7. Process the task
curl -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"ProcessNext","params":{"agent":"Worker-1"},"id":5}' | jq

Automated Tests

defmodule AgentApiWeb.A2AControllerTest do
  use AgentApiWeb.ConnCase

  describe "POST /a2a - SendMessage" do
    test "sends task to agent", %{conn: conn} do
      # First start an agent
      {:ok, _} = TaskManager.start_agent("Test-Worker")

      conn =
        conn
        |> put_req_header("content-type", "application/json")
        |> post("/a2a", %{
          "jsonrpc" => "2.0",
          "method" => "SendMessage",
          "params" => %{
            "agent" => "Test-Worker",
            "action" => "search",
            "params" => %{"query" => "test"}
          },
          "id" => 1
        })

      response = json_response(conn, 200)
      assert response["result"]["status"] == "created"
    end

    test "returns error for missing agent", %{conn: conn} do
      conn =
        conn
        |> put_req_header("content-type", "application/json")
        |> post("/a2a", %{
          "jsonrpc" => "2.0",
          "method" => "SendMessage",
          "params" => %{
            "agent" => "NonExistent",
            "action" => "search"
          },
          "id" => 1
        })

      response = json_response(conn, 200)
      assert response["error"]["code"] == -32001
    end
  end
end

Section 8: Interactive Exercises

Exercise 1: Complete Request Trace

# Trace this complete request through the system:

request = """
POST /a2a HTTP/1.1
Content-Type: application/json

{
  "jsonrpc": "2.0",
  "method": "SendMessage",
  "params": {
    "agent": "Worker-1",
    "action": "search",
    "params": {"query": "Elixir"}
  },
  "id": 42
}
"""

# Fill in each step:
trace = """
1. Request arrives at: ???
2. Router matches: ???
3. Controller.handle receives: ???
4. JsonRpc.parse returns: ???
5. dispatch calls: ???
6. TaskManager.send_message calls: ???
7. AgentServer receives: ???
8. Response returned: ???
"""

# Answer:
trace = """
1. Request arrives at: AgentApiWeb.Endpoint
2. Router matches: POST /a2a β†’ A2AController.handle
3. Controller.handle receives: conn, params (parsed JSON)
4. JsonRpc.parse returns: {:ok, %{method: "SendMessage", params: ..., id: 42}}
5. dispatch calls: handle_send_message(params, 42)
6. TaskManager.send_message calls: AgentServer.send_task(pid, :search, %{...})
7. AgentServer receives: GenServer.cast {:send_task, :search, %{query: "Elixir"}}
8. Response returned: {"jsonrpc":"2.0","result":{"status":"created",...},"id":42}
"""

Exercise 2: Add a New Method

# Add a "GetInbox" method that returns the agent's inbox contents

# 1. Add to dispatch:
# 2. Implement handler:
# 3. Add to TaskManager:

# Your implementation:
defp handle_get_inbox(params, id) do
  # ???
end

# Answer:
defp handle_get_inbox(%{"agent" => agent}, id) do
  case TaskManager.get_agent_state(agent) do
    {:ok, state} ->
      inbox_summary = Enum.map(state.inbox, fn msg ->
        %{
          id: msg.id,
          action: msg.payload.action,
          params: msg.payload.params
        }
      end)
      JsonRpc.success(%{inbox: inbox_summary, count: length(inbox_summary)}, id)

    {:error, :agent_not_found} ->
      JsonRpc.agent_not_found(agent, id)
  end
end

defp handle_get_inbox(_params, id) do
  JsonRpc.invalid_params("Missing: agent", id)
end

Key Takeaways

  1. JSON-RPC provides consistent structure - All requests/responses follow the same format

  2. Method dispatch is like GenServer - Pattern match on method name, delegate to handlers

  3. TaskManager bridges layers - Keeps controller thin, makes logic testable

  4. Error codes are standardized - Use JSON-RPC codes for protocol errors, custom for app errors

  5. Task states track progress - created β†’ working β†’ completed/failed/cancelled

  6. Don’t expose everything - Summarize internal state for HTTP responses


What’s Next?

In the next session, we’ll bring everything together in the Checkpoint Project:

  • Complete Phoenix A2A integration
  • Full testing with curl and automated tests
  • Verification checklist
  • Preview of Phase 5 (distribution)

You’ll have a working HTTP API for your agent framework!


Navigation

← Previous: Session 19 - Controllers and JSON APIs

β†’ Next: Session 21 - Checkpoint Phoenix A2A