IsabelleClientFull
Section
IsabelleClientFull is the process-owning client. It wraps the stateful client
in a GenServer, so many Elixir processes can share one Isabelle connection
without racing on socket reads and writes.
The important distinction is socket ownership. Isabelle replies arrive on a
single TCP stream. In the other clients, if multiple processes call send/recv directly on the same
socket, or share the plain stateful client without coordination, a caller
can consume another caller’s reply. Full prevents that by funneling all protocol
traffic through one owner process.
This notebook does the following:
- verify no-session errors
-
run many concurrent unique long
echocalls through one client process -
interleave concurrent
helpcalls with those echoes -
build and start
HOL -
check several distinct theories concurrently via
use_theories - purge theories, stop the session, shut down the server, and close the GenServer
Setup
When using the published package, replace the dependency with {:isabelle_elixir, "~> 0.2"}.
Mix.install([
{:isabelle_elixir, path: Path.expand("..", __DIR__)}
])
You need to make sure that the isabelle executable is in the PATH.
System.find_executable("isabelle")
Start And Connect
The returned pid is the client handle. The process owns the socket.
server_name = "livebook_full_#{System.unique_integer([:positive])}"
{:ok, [server]} = IsabelleClientMini.new_server(server_name, 0)
{:ok, pid} =
IsabelleClientFull.connect(
password: server["password"],
host: server["host"],
port: server["port"]
)
Process.alive?(pid)
Guard Rails Before A Session Exists
The Full client keeps session state inside the GenServer. Before a session is
started, session-dependent APIs fail locally with :no_session instead of
sending bad commands to Isabelle.
{
IsabelleClientFull.use_theories(pid, %{}),
IsabelleClientFull.purge_theories(pid, %{}),
IsabelleClientFull.stop_session(pid)
}
Baseline Command
{:ok, commands} = IsabelleClientFull.help(pid)
commands
Concurrent Unique Replies
This is showcasing that the “full” client correctly serializes access to one shared Isabelle TCP connection. The dangerous thing to avoid is that, since Isabelle replies come back on a single socket stream, if multiple Elixir processes use the same raw socket directly (process A send command A, process B send command B), then process B might accidentally read the reply for command A.
In the code below, the echo tasks constitute the main test:
Each task sends a unique payload and expects that exact payload back. The payload includes a long string to force Isabelle’s byte-length message framing path. The help tasks are interleaved to add some additional “noise”.
The operations are shuffled, and every worker waits behind a small start
barrier before calling the client. That keeps task creation order from
dominating the GenServer mailbox order. Results are collected with receive,
so the notebook observes completion order instead of reconstructing the original
task list order with ordered Task.await/2 calls.
parent = self()
ref = make_ref()
release_ref = make_ref()
operations =
(Enum.map(1..25, &{:echo, &1}) ++ Enum.map(1..5, &{:help, &1}))
|> Enum.shuffle()
concurrent_tasks =
for operation <- operations do
Task.async(fn ->
receive do
{:go, ^release_ref} -> :ok
end
case operation do
{:echo, n} ->
payload = %{
"client" => "full",
"n" => n,
"token" => "token-#{n}",
"framed" => String.duplicate(Integer.to_string(rem(n, 10)), 160)
}
{:ok, ^payload} = IsabelleClientFull.echo(pid, payload)
{:help, _n} ->
{:ok, commands} = IsabelleClientFull.help(pid)
for command <- ~w(cancel echo help purge_theories session_build session_start session_stop shutdown use_theories) do
true = command in commands
end
end
send(parent, {:full_concurrent_result, ref, operation})
end)
end
Enum.each(concurrent_tasks, &send(&1.pid, {:go, release_ref}))
concurrent_results =
for _ <- 1..30 do
receive do
{:full_concurrent_result, ^ref, result} -> result
after
30_000 -> raise "timed out waiting for concurrent result"
end
end
expected =
(Enum.map(1..25, &{:echo, &1}) ++ Enum.map(1..5, &{:help, &1}))
Enum.each(concurrent_tasks, &Task.await(&1, 1_000))
true = Enum.sort(concurrent_results) == Enum.sort(expected)
%{
creation_order: operations,
completion_order: concurrent_results,
all_expected_results_seen?: Enum.sort(concurrent_results) == Enum.sort(expected)
}
Build And Start A Session
{:ok, build_task} =
IsabelleClientFull.build_session(pid, %{"session" => "HOL"}, 120_000)
{build_task.status, build_task.result["ok"], length(build_task.notes)}
{:ok, start_task} =
IsabelleClientFull.start_session(pid, %{"session" => "HOL"}, 120_000)
session_id = start_task.result["session_id"]
{start_task.status, session_id, Enum.map(start_task.notes, & &1["message"])}
Active Session Sanity Check
Now the GenServer has an active session_id in its internal client state. A
simple echo is enough to show the process and socket are still usable after
that state transition. The earlier concurrency batch already covers shared
caller safety.
{:ok, %{"phase" => "active_session"}} =
IsabelleClientFull.echo(pid, %{"phase" => "active_session"})
Check A Theory
Isabelle reads theories from disk, so the notebook creates a temporary directory.
theory_dir =
Path.join(System.tmp_dir!(), "isabelle_elixir_livebook_full_#{System.unique_integer([:positive])}")
File.mkdir_p!(theory_dir)
theory_dir
The “full” client stores session state inside the GenServer, so use_theories/3
receives no explicit "session_id".
This section also tests the interesting concurrency case for theory checking:
several callers all run use_theories, and each one internally awaits
Isabelle’s async NOTE/FINISHED task stream. With a shared raw socket, these
callers could accidentally consume each other’s task messages. With the “full” client, they
queue behind the GenServer owner and each receives only its own theory result.
theories = [
{"Example1", "lemma \"x = x\"\n by simp", "theorem ?x = ?x"},
{"Example2", "lemma \"xs @ [] = xs\"\n by simp", "theorem ?xs @ [] = ?xs"},
{"Example3", "lemma \"(A \\ A) \\ True\"\n by simp",
"theorem (?A \\ ?A) \\ True"}
]
for {theory, body, _expected} <- theories do
File.write!(Path.join(theory_dir, "#{theory}.thy"), """
theory #{theory} imports Main
begin
#{body}
end
""")
end
File.ls!(theory_dir)
theory_ref = make_ref()
theory_release_ref = make_ref()
theory_operations =
Enum.map(theories, fn {theory, _body, expected_result} ->
{:use_theories, theory, expected_result}
end)
theory_tasks =
theory_operations
|> Enum.shuffle()
|> Enum.map(fn operation ->
Task.async(fn ->
receive do
{:go, ^theory_release_ref} -> :ok
end
result =
case operation do
{:use_theories, theory, expected_result} ->
{:ok, use_task} =
IsabelleClientFull.use_theories(
pid,
%{"theories" => [theory], "master_dir" => theory_dir},
120_000
)
true = use_task.status == :finished
true = use_task.result["ok"]
[%{"theory_name" => theory_name}] = use_task.result["nodes"]
true = theory_name == "Draft.#{theory}"
true =
use_task
|> IsabelleClientMini.extract_results()
|> String.contains?(expected_result)
{:use_theories, theory, expected_result, use_task}
end
send(parent, {:full_concurrent_result, theory_ref, result})
end)
end)
Enum.each(theory_tasks, &send(&1.pid, {:go, theory_release_ref}))
theory_results =
for _ <- 1..length(theory_tasks) do
receive do
{:full_concurrent_result, ^theory_ref, result} -> result
after
120_000 -> raise "timed out waiting for theory concurrency result"
end
end
Enum.each(theory_tasks, &Task.await(&1, 1_000))
checked_theories =
theory_results
|> Enum.map(fn {:use_theories, theory, _expected_result, _task} -> theory end)
|> Enum.sort()
%{
completion_order:
Enum.map(theory_results, fn
{:use_theories, theory, _expected_result, _task} -> theory
end),
all_expected_theories_seen?: checked_theories == ~w(Example1 Example2 Example3),
theory_messages:
Enum.map(theory_results, fn {:use_theories, theory, expected_result, task} ->
%{
theory: theory,
expected_result: expected_result,
messages: IsabelleClientMini.extract_results(task)
}
end)
}
{:ok, purge_result} =
IsabelleClientFull.purge_theories(pid, %{
"theories" => ~w(Example1 Example2 Example3),
"master_dir" => theory_dir
})
{purge_result["purged"], purge_result["retained"]}
Stop And Shutdown
{:ok, stop_task} = IsabelleClientFull.stop_session(pid, 120_000)
{stop_task.status, stop_task.result}
{:ok, nil} = IsabelleClientFull.shutdown_server(pid)
:ok = IsabelleClientFull.close(pid)
kill_server/1 is harmless after protocol shutdown and useful when rerunning
only part of the notebook.
IsabelleClientMini.kill_server(server_name)