Composable SFTP streams in Elixir
Mix.install(
[
{:req, "~> 0.5"},
{:kino, "~> 0.19.0"}
],
consolidate_protocols: false
)
Logger.configure(level: :info)
In-process SFTP server
:ssh.start()
OTP also ships with an SFTP daemon which we can use with a temporary directory to simulate our host
defmodule SSHServer do
defstruct [:daemon_ref, :system_dir, :sftp_root, :port]
@host ~c"localhost"
@username ~c"livebook"
@password ~c"livebook"
def start(port \\ 0) do
unique_id = System.unique_integer([:positive])
system_dir = Path.join(System.tmp_dir!(), "ssh_system_#{unique_id}")
sftp_root = Path.join(System.tmp_dir!(), "sftp_root_#{unique_id}")
File.mkdir_p!(system_dir)
File.mkdir_p!(sftp_root)
# Generate an RSA host key with :public_key and write it as PEM.
# The default :ssh_file handler will pick it up from system_dir.
host_key = :public_key.generate_key({:rsa, 2048, 65537})
pem_entry = :public_key.pem_entry_encode(:RSAPrivateKey, host_key)
pem = :public_key.pem_encode([pem_entry])
system_dir
|> Path.join("ssh_host_rsa_key")
|> File.write!(pem)
{:ok, daemon_ref} =
:ssh.daemon(port,
system_dir: String.to_charlist(system_dir),
auth_methods: ~c"password",
user_passwords: [{@username, @password}],
subsystems: [:ssh_sftpd.subsystem_spec(cwd: String.to_charlist(sftp_root))]
)
{:ok, daemon_info} = :ssh.daemon_info(daemon_ref)
port = Keyword.fetch!(daemon_info, :port)
%__MODULE__{
daemon_ref: daemon_ref,
port: port,
system_dir: system_dir,
sftp_root: sftp_root
}
end
def with_channel(server, lambda) do
{:ok, channel, conn} =
:ssh_sftp.start_channel(@host, server.port,
user: @username,
password: @password,
silently_accept_hosts: true,
user_interaction: false
)
try do
lambda.(channel)
after
:ssh.close(conn)
end
end
def stop(server) do
:ssh.stop_daemon(server.daemon_ref)
File.rm_rf!(server.system_dir)
File.rm_rf!(server.sftp_root)
end
end
Helper
This Helper module just defines a function, so that we can later inspect chunks from the enumerable everywhere in the same way.
defmodule Helper do
def inspect_chunk(chunk) do
IO.inspect(chunk,
label: "chunk with #{byte_size(chunk)} bytes",
limit: 5
)
end
end
Setup
server = SSHServer.start()
We’re not creating a big video file here. For demonstration purposes a small file still works through a few chunks
source_path = System.tmp_dir!() |> Path.join("my_big_video.mov")
File.write!(source_path, :crypto.strong_rand_bytes(10_000))
File.stat!(source_path).size
Naive version
As explained in the article, this loads the whole file into memory an can cause OOM errors
SSHServer.with_channel(server, fn channel ->
video_file = File.read!(source_path)
destination_path = "naive.mov"
IO.inspect(byte_size(video_file), label: "Video File Size: ")
:ok = :ssh_sftp.write_file(channel, destination_path, video_file)
server.sftp_root
|> Path.join(destination_path)
|> File.exists?()
|> IO.inspect(label: "File exists?:")
end)
For comprehension
Works, but: no cleanup on failure, leaks the Erlang API to every callsite, and the destination isn’t a value you can pass around.
# 4 KB chunks so we see the streaming happen
chunk_size = 4 * 1024
file_stream = File.stream!(source_path, chunk_size)
destination_path = "for_comprehension.mov"
SSHServer.with_channel(server, fn channel ->
{:ok, handle} = :ssh_sftp.open(channel, destination_path, [:write, :binary])
for chunk <- file_stream do
:ok = :ssh_sftp.write(channel, handle, chunk)
Helper.inspect_chunk(chunk)
end
:ok = :ssh_sftp.close(channel, handle)
end)
File.read!(source_path) == server.sftp_root |> Path.join(destination_path) |> File.read!()
Understand the Collectable protocol
Before we implement the Collectable protocol for SFTP.Stream we have a look at the protocol itself with a simple example.
defmodule ChunkInspector do
defstruct []
defimpl Collectable do
def into(struct) do
collector_fun = fn
_acc, {:cont, chunk} -> Helper.inspect_chunk(chunk)
acc, :done -> acc
acc, :halt -> acc
end
{struct, collector_fun}
end
end
end
source_path
|> File.stream!(chunk_size)
|> Stream.take(3)
|> Enum.into(%ChunkInspector{})
:ok
Implement the Collectable protocol
Now we know how Collectable works and we understand how to write a file over SFTP. Let’s combine the two and implement our final version.
defmodule SFTP.Stream do
defstruct [:channel, :path, :handle, :status]
def build(channel, path) when is_pid(channel) and is_binary(path),
do: %__MODULE__{channel: channel, path: path}
defimpl Collectable do
def into(%{channel: channel, path: path} = stream) do
{:ok, handle} =
:ssh_sftp.open(channel, path, [:write, :binary])
stream = %{stream | handle: handle, status: :ok}
collector_fun = fn
%{status: :ok, handle: handle} = stream, {:cont, chunk} ->
case :ssh_sftp.write(channel, handle, chunk) do
:ok -> stream
{:error, err} -> %{stream | status: err}
end
stream, {:cont, _chunk} ->
# status != :ok — wait for :done to clean up
stream
%{handle: handle, status: status}, :done ->
:ssh_sftp.close(channel, handle)
status
%{handle: handle}, :halt ->
:ssh_sftp.close(channel, handle)
end
{stream, collector_fun}
end
end
end
Using SFTP.Stream
SSHServer.with_channel(server, fn channel ->
source = File.stream!(source_path, chunk_size)
destination_path = "collectable.mov"
sink = SFTP.Stream.build(channel, destination_path)
source
|> Stream.into(sink)
|> Stream.each(fn chunk -> Helper.inspect_chunk(chunk) end)
|> Stream.run()
|> IO.inspect(label: "Stream result")
end)
Verify the upload:
File.read!(source_path) == server.sftp_root |> Path.join(destination_path) |> File.read!()
Comeback of the for comprehension
The very first thing we wrote also accepts :into — meaning our SFTP sink
slots in with no protocol-specific code:
SSHServer.with_channel(server, fn channel ->
sink = SFTP.Stream.build(channel, "for_into.mov")
for chunk <- File.stream!(source_path, chunk_size), into: sink do
Helper.inspect_chunk(chunk)
end
end)
File.exists?(Path.join(server.sftp_root, "for_into.mov"))
Other use cases
Req accepts any Collectable as :into. So we can stream an HTTP download
through our SFTP sink without ever touching disk on this side:
filename = "downloaded.txt"
SSHServer.with_channel(server, fn channel ->
sink = SFTP.Stream.build(channel, filename)
Req.new(url: "https://httpbin.org/uuid", into: sink)
|> Req.get!()
end)
file = server.sftp_root |> Path.join(filename) |> File.read!()
if System.version() |> Version.parse!() |> Version.match?(">= 1.18.0") do
JSON.decode!(file)
else
file
end
Cleanup
File.rm!(source_path)
SSHServer.stop(server)