Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

MCP Experiments

mcp_experiments.livemd

MCP Experiments

Mix.install([
  # {:hermes_mcp, "~> 0.2.3"},
  :jason,
  :uuid
])

Start the file system server

# start the nodejs filesystem server
# ❯ npx -y @modelcontextprotocol/server-filesystem /Users/dimova01/git/origin_simulator
defmodule MessageCollector do
  def collect do
    receive do
      msg -> 
        IO.inspect(msg, label: "Received")
        collect()
    after
      1000 -> IO.puts("No more messages")
    end
  end
end

Elixir Port

The messages supported by ports and their counterpart function APIs are listed below:

{pid, {:command, binary}} - sends the given data to the port. See command/3.

{pid, :close} - closes the port. Unless the port is already closed, the port will reply with {port, :closed} message once it has flushed its buffers and effectively closed. See close/1.

{pid, {:connect, new_pid}} - sets the new_pid as the new owner of the port. Once a port is opened, the port is linked and connected to the caller process and communication to the port only happens through the connected process. This message makes new_pid the new connected processes. Unless the port is dead, the port will reply to the old owner with {port, :connected}. See connect/2.

On its turn, the port will send the connected process the following messages:

{port, {:data, data}} - data sent by the port
{port, :closed} - reply to the {pid, :close} message
{port, :connected} - reply to the {pid, {:connect, new_pid}} message
{:EXIT, port, reason} - exit signals in case the port crashes. If reason is not :normal, this message will only be received if the owner process is trapping exits
Port.list()
cmd = "echo hello"
port = Port.open({:spawn, cmd}, [:binary])
MessageCollector.collect()
port = Port.open({:spawn, "cat"}, [:binary])
send(port, {self(), {:command, "hello"}})
send(port, {self(), {:command, "world"}})
MessageCollector.collect()
send(self(), :close)
MessageCollector.collect()
Port.close(port)
MessageCollector.collect()

Spawn

port = Port.open({:spawn, "echo hello"}, [:binary])
MessageCollector.collect()

Spawn executable

path = System.find_executable("echo")
port = Port.open({:spawn_executable, path}, [:binary, args: ["Hello World!"]])
MessageCollector.collect()

Start port

Enum.with_index(Port.list(), fn port, index ->
  if index > 5, do: Port.close(port), else: IO.inspect(port)
end)
cmd = "/Users/dimova01/.asdf/shims/npx"
args = ["-y", "@modelcontextprotocol/server-filesystem", "/Users/dimova01/git/origin_simulator"]
# Port.close(port)
port = Port.open({:spawn_executable, cmd}, [
  :binary,
  :exit_status,
  {:args, args},
  {:line, 4096},
  :stderr_to_stdout
])

Initialization

jsonrpc_version = "2.0"
protocol_version = "2024-11-05"
request_id = UUID.uuid4()

initialize_request = %{
  jsonrpc: jsonrpc_version,
  id: request_id,
  method: "initialize",
  params: %{
    protocolVersion: protocol_version,
    capabilities: %{
      roots: %{
        listChanged: true
      },
      sampling: %{}
    },
    clientInfo: %{
      name: "Elixir MCP Client",
      version: "0.1.0"
    }
  }
}

encoded_request = Jason.encode!(initialize_request)
IO.inspect(port, label: "Port")
Port.command(port, "#{encoded_request}\n")
|> IO.inspect(label: "command")
# MessageCollector.collect()
# resp = {:data,
#   {:eol,
#    "{\"result\":{\"protocolVersion\":\"2024-11-05\",\"capabilities\":{\"tools\":{}},\"serverInfo\":{\"name\":\"secure-filesystem-server\",\"version\":\"0.2.0\"}},\"jsonrpc\":\"2.0\",\"id\":\"8aa8d77d-9d36-4591-8e43-c3718225301b\"}"}}

receive_response = fn ->
  # Recursively collect data chunks
  collect = fn collect, buffer ->
    receive do
      {_port, {:data, data}} ->
        # Extract content
        content = case data do
          {:noeol, text} -> text
          {:eol, text} -> text
          text when is_binary(text) -> text
        end
        
        # Append to our buffer
        new_buffer = buffer <> content
        
        # Look for JSON objects starting with { and ending with }
        case Regex.run(~r/(\{.*\})/s, new_buffer) do
          [json_candidate | _] ->
            case Jason.decode(json_candidate) do
              {:ok, decoded} -> 
                IO.puts("Successfully parsed JSON response")
                {:ok, decoded}
              {:error, _} -> 
                # Keep collecting if we can't parse it yet
                collect.(collect, new_buffer)
            end
          nil ->
            # No JSON object found yet
            collect.(collect, new_buffer)
        end
    after
      10000 -> {:error, :timeout, buffer}
    end
  end
  
  collect.(collect, "")
end
initialize_response = receive_response.()
IO.inspect(initialize_response, label: "Initialize Response")
# Send the initialized notification
initialized_notification = %{
  jsonrpc: jsonrpc_version,
  method: "notifications/initialized"
}

encoded_notification = Jason.encode!(initialized_notification)
IO.puts("Sending notification: #{encoded_notification}")
Port.command(port, "#{encoded_notification}\n")
receive_response.()
IO.puts("MCP initialization complete - ready for operations")

Operation

# Send the tools/list request
tools_request = %{
  jsonrpc: jsonrpc_version,
  id: UUID.uuid4(),
  method: "tools/list"
}

encoded_tools_request = Jason.encode!(tools_request)
IO.puts("Sending tools request: #{encoded_tools_request}")
Port.command(port, "#{encoded_tools_request}\n")

# Receive and process the complete response
tools_response = receive_response.()
IO.inspect(tools_response, label: "Available Tools")
# Request to get allowed directories
allowed_dirs_request = %{
  jsonrpc: jsonrpc_version,
  id: UUID.uuid4(),
  method: "tools/call",
  params: %{
    name: "list_allowed_directories",
    arguments: %{}  # No arguments needed for this tool
  }
}

encoded_dirs_request = Jason.encode!(allowed_dirs_request)
IO.puts("Sending allowed directories request: #{encoded_dirs_request}")
Port.command(port, "#{encoded_dirs_request}\n")

# Get the response
allowed_dirs_response = receive_response.()
IO.inspect(allowed_dirs_response, label: "Allowed Directories")
# Function to call tools (include this if you don't already have it defined)
call_tool = fn tool_name, arguments ->
  request = %{
    jsonrpc: jsonrpc_version,
    id: UUID.uuid4(),
    method: "tools/call",
    params: %{
      name: tool_name,
      arguments: arguments
    }
  }
  
  encoded_request = Jason.encode!(request)
  Port.command(port, "#{encoded_request}\n")
  
  receive_response.()
end

# Read README.md file
{:ok, readme_result} = call_tool.("read_file", %{
  path: "/Users/dimova01/git/origin_simulator/README.md"
})

# Display the result
IO.inspect(readme_result, label: "README.md Content")
IO.puts(List.first(readme_result["result"]["content"], %{})["text"])
{:ok, res} = call_tool.("write_file", %{
  path: "/Users/dimova01/git/origin_simulator/README.md",
  content: "# Deprecated\nPlease use the version in Rust."
})
IO.inspect(res, label: "write_file")
{:ok, res} = call_tool.("read_file", %{
  path: "/Users/dimova01/git/origin_simulator/README.md"
})
IO.inspect(res, label: "read_file")

MCPClient

defmodule MCPClient do
  @moduledoc """
  A simple MCP client implementation for communicating with an MCP server via stdio.
  """
  
  @jsonrpc_version "2.0"
  @protocol_version "2024-11-05"
  
  @doc """
  Start a new connection to the MCP server and return a client state.
  """
  def start(command, args) do
    # Open a port to the server process, communicating via stdio
    port = Port.open({:spawn_executable, System.find_executable(command)}, [
      :binary,
      :exit_status,
      {:args, args},
      {:line, 4096},  # Line-based IO with generous buffer
      :stderr_to_stdout
    ])
    
    # Return initial state
    %{
      port: port,
      request_id: 1,
      initialized: false
    }
  end
  
  @doc """
  Send a raw JSON-RPC message to the server.
  """
  def send_message(state, message) do
    json = if is_binary(message), do: message, else: Jason.encode!(message)
    IO.puts("Sending: #{json}")
    Port.command(state.port, "#{json}\n")
    state
  end
  
  @doc """
  Wait for and receive a response from the server.
  """
  def receive_message(state, timeout \\ 5000) do
    receive do
      {port, {:data, data}} when port == state.port ->
        IO.puts("Received: #{data}")
        case Jason.decode(String.trim(data)) do
          {:ok, decoded} -> {decoded, state}
          {:error, error} -> {{:error, error, data}, state}
        end
      
      {port, {:exit_status, status}} when port == state.port ->
        IO.puts("Server exited with status: #{status}")
        {{:exit, status}, state}
    after
      timeout ->
        IO.puts("Timeout waiting for response")
        {{:timeout}, state}
    end
  end
  
  @doc """
  Initialize the connection with the server.
  """
  def initialize(state) do
    # Create initialize request
    request = %{
      jsonrpc: @jsonrpc_version,
      id: state.request_id,
      method: "initialize",
      params: %{
        protocolVersion: @protocol_version,
        clientInfo: %{
          name: "Elixir MCP Client",
          version: "0.1.0"
        },
        capabilities: %{
          # Minimal capabilities for now
          resources: %{}
        }
      }
    }
    
    # Send the request and update request ID
    state = send_message(state, request)
    state = %{state | request_id: state.request_id + 1}
    
    # Wait for response
    {response, state} = receive_message(state)
    
    case response do
      %{"result" => result} ->
        # Send initialized notification
        notification = %{
          jsonrpc: @jsonrpc_version,
          method: "notifications/initialized"
        }
        
        state = send_message(state, notification)
        {result, %{state | initialized: true}}
      
      other ->
        {other, state}
    end
  end
  
  @doc """
  List tools available on the server.
  """
  def list_tools(state) do
    request = %{
      jsonrpc: @jsonrpc_version,
      id: state.request_id,
      method: "tools/list"
    }
    
    state = send_message(state, request)
    state = %{state | request_id: state.request_id + 1}
    
    receive_message(state)
  end
  
  @doc """
  List resources available from the server.
  """
  def list_resources(state) do
    request = %{
      jsonrpc: @jsonrpc_version,
      id: state.request_id,
      method: "resources/list"
    }
    
    state = send_message(state, request)
    state = %{state | request_id: state.request_id + 1}
    
    receive_message(state)
  end
  
  @doc """
  Call a tool on the server.
  """
  def call_tool(state, tool_name, arguments) do
    request = %{
      jsonrpc: @jsonrpc_version,
      id: state.request_id,
      method: "tools/call",
      params: %{
        name: tool_name,
        arguments: arguments
      }
    }
    
    state = send_message(state, request)
    state = %{state | request_id: state.request_id + 1}
    
    receive_message(state)
  end
  
  @doc """
  Read a file using the read_file tool.
  """
  def read_file(state, path) do
    call_tool(state, "read_file", %{path: path})
  end
  
  @doc """
  List directory contents using the list_directory tool.
  """
  def list_directory(state, path) do
    call_tool(state, "list_directory", %{path: path})
  end
  
  @doc """
  Clean up resources and stop the client.
  """
  def stop(state) do
    Port.close(state.port)
    :ok
  end
end