MCP Experiments
Mix.install([
# {:hermes_mcp, "~> 0.2.3"},
:jason,
:uuid
])
Start the file system server
# start the nodejs filesystem server
# ❯ npx -y @modelcontextprotocol/server-filesystem /Users/dimova01/git/origin_simulator
defmodule MessageCollector do
def collect do
receive do
msg ->
IO.inspect(msg, label: "Received")
collect()
after
1000 -> IO.puts("No more messages")
end
end
end
Elixir Port
The messages supported by ports and their counterpart function APIs are listed below:
{pid, {:command, binary}} - sends the given data to the port. See command/3.
{pid, :close} - closes the port. Unless the port is already closed, the port will reply with {port, :closed} message once it has flushed its buffers and effectively closed. See close/1.
{pid, {:connect, new_pid}} - sets the new_pid as the new owner of the port. Once a port is opened, the port is linked and connected to the caller process and communication to the port only happens through the connected process. This message makes new_pid the new connected processes. Unless the port is dead, the port will reply to the old owner with {port, :connected}. See connect/2.
On its turn, the port will send the connected process the following messages:
{port, {:data, data}} - data sent by the port
{port, :closed} - reply to the {pid, :close} message
{port, :connected} - reply to the {pid, {:connect, new_pid}} message
{:EXIT, port, reason} - exit signals in case the port crashes. If reason is not :normal, this message will only be received if the owner process is trapping exits
Port.list()
cmd = "echo hello"
port = Port.open({:spawn, cmd}, [:binary])
MessageCollector.collect()
port = Port.open({:spawn, "cat"}, [:binary])
send(port, {self(), {:command, "hello"}})
send(port, {self(), {:command, "world"}})
MessageCollector.collect()
send(self(), :close)
MessageCollector.collect()
Port.close(port)
MessageCollector.collect()
Spawn
port = Port.open({:spawn, "echo hello"}, [:binary])
MessageCollector.collect()
Spawn executable
path = System.find_executable("echo")
port = Port.open({:spawn_executable, path}, [:binary, args: ["Hello World!"]])
MessageCollector.collect()
Start port
Enum.with_index(Port.list(), fn port, index ->
if index > 5, do: Port.close(port), else: IO.inspect(port)
end)
cmd = "/Users/dimova01/.asdf/shims/npx"
args = ["-y", "@modelcontextprotocol/server-filesystem", "/Users/dimova01/git/origin_simulator"]
# Port.close(port)
port = Port.open({:spawn_executable, cmd}, [
:binary,
:exit_status,
{:args, args},
{:line, 4096},
:stderr_to_stdout
])
Initialization
jsonrpc_version = "2.0"
protocol_version = "2024-11-05"
request_id = UUID.uuid4()
initialize_request = %{
jsonrpc: jsonrpc_version,
id: request_id,
method: "initialize",
params: %{
protocolVersion: protocol_version,
capabilities: %{
roots: %{
listChanged: true
},
sampling: %{}
},
clientInfo: %{
name: "Elixir MCP Client",
version: "0.1.0"
}
}
}
encoded_request = Jason.encode!(initialize_request)
IO.inspect(port, label: "Port")
Port.command(port, "#{encoded_request}\n")
|> IO.inspect(label: "command")
# MessageCollector.collect()
# resp = {:data,
# {:eol,
# "{\"result\":{\"protocolVersion\":\"2024-11-05\",\"capabilities\":{\"tools\":{}},\"serverInfo\":{\"name\":\"secure-filesystem-server\",\"version\":\"0.2.0\"}},\"jsonrpc\":\"2.0\",\"id\":\"8aa8d77d-9d36-4591-8e43-c3718225301b\"}"}}
receive_response = fn ->
# Recursively collect data chunks
collect = fn collect, buffer ->
receive do
{_port, {:data, data}} ->
# Extract content
content = case data do
{:noeol, text} -> text
{:eol, text} -> text
text when is_binary(text) -> text
end
# Append to our buffer
new_buffer = buffer <> content
# Look for JSON objects starting with { and ending with }
case Regex.run(~r/(\{.*\})/s, new_buffer) do
[json_candidate | _] ->
case Jason.decode(json_candidate) do
{:ok, decoded} ->
IO.puts("Successfully parsed JSON response")
{:ok, decoded}
{:error, _} ->
# Keep collecting if we can't parse it yet
collect.(collect, new_buffer)
end
nil ->
# No JSON object found yet
collect.(collect, new_buffer)
end
after
10000 -> {:error, :timeout, buffer}
end
end
collect.(collect, "")
end
initialize_response = receive_response.()
IO.inspect(initialize_response, label: "Initialize Response")
# Send the initialized notification
initialized_notification = %{
jsonrpc: jsonrpc_version,
method: "notifications/initialized"
}
encoded_notification = Jason.encode!(initialized_notification)
IO.puts("Sending notification: #{encoded_notification}")
Port.command(port, "#{encoded_notification}\n")
receive_response.()
IO.puts("MCP initialization complete - ready for operations")
Operation
# Send the tools/list request
tools_request = %{
jsonrpc: jsonrpc_version,
id: UUID.uuid4(),
method: "tools/list"
}
encoded_tools_request = Jason.encode!(tools_request)
IO.puts("Sending tools request: #{encoded_tools_request}")
Port.command(port, "#{encoded_tools_request}\n")
# Receive and process the complete response
tools_response = receive_response.()
IO.inspect(tools_response, label: "Available Tools")
# Request to get allowed directories
allowed_dirs_request = %{
jsonrpc: jsonrpc_version,
id: UUID.uuid4(),
method: "tools/call",
params: %{
name: "list_allowed_directories",
arguments: %{} # No arguments needed for this tool
}
}
encoded_dirs_request = Jason.encode!(allowed_dirs_request)
IO.puts("Sending allowed directories request: #{encoded_dirs_request}")
Port.command(port, "#{encoded_dirs_request}\n")
# Get the response
allowed_dirs_response = receive_response.()
IO.inspect(allowed_dirs_response, label: "Allowed Directories")
# Function to call tools (include this if you don't already have it defined)
call_tool = fn tool_name, arguments ->
request = %{
jsonrpc: jsonrpc_version,
id: UUID.uuid4(),
method: "tools/call",
params: %{
name: tool_name,
arguments: arguments
}
}
encoded_request = Jason.encode!(request)
Port.command(port, "#{encoded_request}\n")
receive_response.()
end
# Read README.md file
{:ok, readme_result} = call_tool.("read_file", %{
path: "/Users/dimova01/git/origin_simulator/README.md"
})
# Display the result
IO.inspect(readme_result, label: "README.md Content")
IO.puts(List.first(readme_result["result"]["content"], %{})["text"])
{:ok, res} = call_tool.("write_file", %{
path: "/Users/dimova01/git/origin_simulator/README.md",
content: "# Deprecated\nPlease use the version in Rust."
})
IO.inspect(res, label: "write_file")
{:ok, res} = call_tool.("read_file", %{
path: "/Users/dimova01/git/origin_simulator/README.md"
})
IO.inspect(res, label: "read_file")
MCPClient
defmodule MCPClient do
@moduledoc """
A simple MCP client implementation for communicating with an MCP server via stdio.
"""
@jsonrpc_version "2.0"
@protocol_version "2024-11-05"
@doc """
Start a new connection to the MCP server and return a client state.
"""
def start(command, args) do
# Open a port to the server process, communicating via stdio
port = Port.open({:spawn_executable, System.find_executable(command)}, [
:binary,
:exit_status,
{:args, args},
{:line, 4096}, # Line-based IO with generous buffer
:stderr_to_stdout
])
# Return initial state
%{
port: port,
request_id: 1,
initialized: false
}
end
@doc """
Send a raw JSON-RPC message to the server.
"""
def send_message(state, message) do
json = if is_binary(message), do: message, else: Jason.encode!(message)
IO.puts("Sending: #{json}")
Port.command(state.port, "#{json}\n")
state
end
@doc """
Wait for and receive a response from the server.
"""
def receive_message(state, timeout \\ 5000) do
receive do
{port, {:data, data}} when port == state.port ->
IO.puts("Received: #{data}")
case Jason.decode(String.trim(data)) do
{:ok, decoded} -> {decoded, state}
{:error, error} -> {{:error, error, data}, state}
end
{port, {:exit_status, status}} when port == state.port ->
IO.puts("Server exited with status: #{status}")
{{:exit, status}, state}
after
timeout ->
IO.puts("Timeout waiting for response")
{{:timeout}, state}
end
end
@doc """
Initialize the connection with the server.
"""
def initialize(state) do
# Create initialize request
request = %{
jsonrpc: @jsonrpc_version,
id: state.request_id,
method: "initialize",
params: %{
protocolVersion: @protocol_version,
clientInfo: %{
name: "Elixir MCP Client",
version: "0.1.0"
},
capabilities: %{
# Minimal capabilities for now
resources: %{}
}
}
}
# Send the request and update request ID
state = send_message(state, request)
state = %{state | request_id: state.request_id + 1}
# Wait for response
{response, state} = receive_message(state)
case response do
%{"result" => result} ->
# Send initialized notification
notification = %{
jsonrpc: @jsonrpc_version,
method: "notifications/initialized"
}
state = send_message(state, notification)
{result, %{state | initialized: true}}
other ->
{other, state}
end
end
@doc """
List tools available on the server.
"""
def list_tools(state) do
request = %{
jsonrpc: @jsonrpc_version,
id: state.request_id,
method: "tools/list"
}
state = send_message(state, request)
state = %{state | request_id: state.request_id + 1}
receive_message(state)
end
@doc """
List resources available from the server.
"""
def list_resources(state) do
request = %{
jsonrpc: @jsonrpc_version,
id: state.request_id,
method: "resources/list"
}
state = send_message(state, request)
state = %{state | request_id: state.request_id + 1}
receive_message(state)
end
@doc """
Call a tool on the server.
"""
def call_tool(state, tool_name, arguments) do
request = %{
jsonrpc: @jsonrpc_version,
id: state.request_id,
method: "tools/call",
params: %{
name: tool_name,
arguments: arguments
}
}
state = send_message(state, request)
state = %{state | request_id: state.request_id + 1}
receive_message(state)
end
@doc """
Read a file using the read_file tool.
"""
def read_file(state, path) do
call_tool(state, "read_file", %{path: path})
end
@doc """
List directory contents using the list_directory tool.
"""
def list_directory(state, path) do
call_tool(state, "list_directory", %{path: path})
end
@doc """
Clean up resources and stop the client.
"""
def stop(state) do
Port.close(state.port)
:ok
end
end