Powered by AppSignal & Oban Pro

Composable SFTP streams in Elixir

composable-sftp-streams.livemd

Composable SFTP streams in Elixir

Mix.install(
  [
    {:req, "~> 0.5"},
    {:kino, "~> 0.19.0"}
  ],
  consolidate_protocols: false
)

Logger.configure(level: :info)

In-process SFTP server

:ssh.start()

OTP also ships with an SFTP daemon which we can use with a temporary directory to simulate our host

defmodule SSHServer do
  defstruct [:daemon_ref, :system_dir, :sftp_root, :port]

  @host ~c"localhost"
  @username ~c"livebook"
  @password ~c"livebook"

  def start(port \\ 0) do
    unique_id = System.unique_integer([:positive])
    system_dir = Path.join(System.tmp_dir!(), "ssh_system_#{unique_id}")
    sftp_root = Path.join(System.tmp_dir!(), "sftp_root_#{unique_id}")

    File.mkdir_p!(system_dir)
    File.mkdir_p!(sftp_root)

    # Generate an RSA host key with :public_key and write it as PEM.
    # The default :ssh_file handler will pick it up from system_dir.
    host_key = :public_key.generate_key({:rsa, 2048, 65537})
    pem_entry = :public_key.pem_entry_encode(:RSAPrivateKey, host_key)
    pem = :public_key.pem_encode([pem_entry])

    system_dir
    |> Path.join("ssh_host_rsa_key")
    |> File.write!(pem)

    {:ok, daemon_ref} =
      :ssh.daemon(port,
        system_dir: String.to_charlist(system_dir),
        auth_methods: ~c"password",
        user_passwords: [{@username, @password}],
        subsystems: [:ssh_sftpd.subsystem_spec(cwd: String.to_charlist(sftp_root))]
      )

    {:ok, daemon_info} = :ssh.daemon_info(daemon_ref)
    port = Keyword.fetch!(daemon_info, :port)

    %__MODULE__{
      daemon_ref: daemon_ref,
      port: port,
      system_dir: system_dir,
      sftp_root: sftp_root
    }
  end

  def with_channel(server, lambda) do
    {:ok, channel, conn} =
      :ssh_sftp.start_channel(@host, server.port,
        user: @username,
        password: @password,
        silently_accept_hosts: true,
        user_interaction: false
      )

    try do
      lambda.(channel)
    after
      :ssh.close(conn)
    end
  end

  def stop(server) do
    :ssh.stop_daemon(server.daemon_ref)
    File.rm_rf!(server.system_dir)
    File.rm_rf!(server.sftp_root)
  end
end

Helper

This Helper module just defines a function, so that we can later inspect chunks from the enumerable everywhere in the same way.

defmodule Helper do
  def inspect_chunk(chunk) do
    IO.inspect(chunk,
      label: "chunk with #{byte_size(chunk)} bytes",
      limit: 5
    )
  end
end

Setup

server = SSHServer.start()

We’re not creating a big video file here. For demonstration purposes a small file still works through a few chunks

source_path = System.tmp_dir!() |> Path.join("my_big_video.mov")
File.write!(source_path, :crypto.strong_rand_bytes(10_000))
File.stat!(source_path).size

Naive version

As explained in the article, this loads the whole file into memory an can cause OOM errors

SSHServer.with_channel(server, fn channel ->
  video_file = File.read!(source_path)
  destination_path = "naive.mov"

  IO.inspect(byte_size(video_file), label: "Video File Size: ")

  :ok = :ssh_sftp.write_file(channel, destination_path, video_file)

  server.sftp_root
  |> Path.join(destination_path)
  |> File.exists?()
  |> IO.inspect(label: "File exists?:")
end)

For comprehension

Works, but: no cleanup on failure, leaks the Erlang API to every callsite, and the destination isn’t a value you can pass around.

# 4 KB chunks so we see the streaming happen
chunk_size = 4 * 1024
file_stream = File.stream!(source_path, chunk_size)
destination_path = "for_comprehension.mov"

SSHServer.with_channel(server, fn channel ->
  {:ok, handle} = :ssh_sftp.open(channel, destination_path, [:write, :binary])

  for chunk <- file_stream do
    :ok = :ssh_sftp.write(channel, handle, chunk)
    Helper.inspect_chunk(chunk)
  end

  :ok = :ssh_sftp.close(channel, handle)
end)

File.read!(source_path) == server.sftp_root |> Path.join(destination_path) |> File.read!()

Understand the Collectable protocol

Before we implement the Collectable protocol for SFTP.Stream we have a look at the protocol itself with a simple example.

defmodule ChunkInspector do
  defstruct []

  defimpl Collectable do
    def into(struct) do
      collector_fun = fn
        _acc, {:cont, chunk} -> Helper.inspect_chunk(chunk)
        acc, :done -> acc
        acc, :halt -> acc
      end

      {struct, collector_fun}
    end
  end
end
source_path
|> File.stream!(chunk_size)
|> Stream.take(3)
|> Enum.into(%ChunkInspector{})

:ok

Implement the Collectable protocol

Now we know how Collectable works and we understand how to write a file over SFTP. Let’s combine the two and implement our final version.

defmodule SFTP.Stream do
  defstruct [:channel, :path, :handle, :status]

  def build(channel, path) when is_pid(channel) and is_binary(path),
    do: %__MODULE__{channel: channel, path: path}

  defimpl Collectable do
    def into(%{channel: channel, path: path} = stream) do
      {:ok, handle} =
        :ssh_sftp.open(channel, path, [:write, :binary])

      stream = %{stream | handle: handle, status: :ok}

      collector_fun = fn
        %{status: :ok, handle: handle} = stream, {:cont, chunk} ->
          case :ssh_sftp.write(channel, handle, chunk) do
            :ok -> stream
            {:error, err} -> %{stream | status: err}
          end

        stream, {:cont, _chunk} ->
          # status != :ok — wait for :done to clean up
          stream

        %{handle: handle, status: status}, :done ->
          :ssh_sftp.close(channel, handle)
          status

        %{handle: handle}, :halt ->
          :ssh_sftp.close(channel, handle)
      end

      {stream, collector_fun}
    end
  end
end

Using SFTP.Stream

SSHServer.with_channel(server, fn channel ->
  source = File.stream!(source_path, chunk_size)
  destination_path = "collectable.mov"
  sink = SFTP.Stream.build(channel, destination_path)

  source
  |> Stream.into(sink)
  |> Stream.each(fn chunk -> Helper.inspect_chunk(chunk) end)
  |> Stream.run()
  |> IO.inspect(label: "Stream result")
end)

Verify the upload:

File.read!(source_path) == server.sftp_root |> Path.join(destination_path) |> File.read!()

Comeback of the for comprehension

The very first thing we wrote also accepts :into — meaning our SFTP sink slots in with no protocol-specific code:

SSHServer.with_channel(server, fn channel ->
  sink = SFTP.Stream.build(channel, "for_into.mov")

  for chunk <- File.stream!(source_path, chunk_size), into: sink do
    Helper.inspect_chunk(chunk)
  end
end)

File.exists?(Path.join(server.sftp_root, "for_into.mov"))

Other use cases

Req accepts any Collectable as :into. So we can stream an HTTP download through our SFTP sink without ever touching disk on this side:

filename = "downloaded.txt"

SSHServer.with_channel(server, fn channel ->
  sink = SFTP.Stream.build(channel, filename)

  Req.new(url: "https://httpbin.org/uuid", into: sink)
  |> Req.get!()
end)

file = server.sftp_root |> Path.join(filename) |> File.read!()

if System.version() |> Version.parse!() |> Version.match?(">= 1.18.0") do
  JSON.decode!(file)
else
  file
end

Cleanup

File.rm!(source_path)
SSHServer.stop(server)