Powered by AppSignal & Oban Pro

Livebook runner

apps/notebooks/s3_streaming_test.livemd

Livebook runner

Mix.install([
  {:vix, "~> 0.35.0"},
  {:req, "~> 0.5"},
  {:kino, "~> 0.17.0"},
  {:protobuf, "~> 0.15.0"},
  {:ex_image_info, "~> 1.0"}
])

# Set cookie to match your services
Node.set_cookie(:msvc_dev_cookie_change_in_production)
# Node.connect(:"user_svc@user_svc.msvc")
{
  Node.connect(:"user_svc@user_svc.msvc"),
  Node.connect(:"client_svc@client_svc.msvc"),
  Node.connect(:"email_svc@email_svc.msvc"),
  Node.connect(:"image_svc_1@image_svc_1.msvc"),
  Node.connect(:"image_svc_2@image_svc_2.msvc")
}

Dashboard observables selection

JAEGER TRACES

PROMETHEUS METRICS

MINIO LOCAL S3 STORAGE

GRAFANA DASHBOARDS

Architecture Overview with two Image services

architecture-beta
    service lvb(cloud)[LiveBook] in api
    group api(cloud)[API]
    service nats(internet)[NATS JetStream] in api
    service client(internet)[Client] in api
    service s3(disk)[S3 MinIO] in api
    service user(server)[User Service] in api
    service email(internet)[Email Service] in api
    service image(disk)[Image1 Service] in api
    service image2(disk)[Image2 Service] in api

    lvb:R -- L:client
    client:T -- B:nats
    image:R -- L:nats
    email:L -- R:nats
    nats:T -- B:user
    s3:R -- L:user
    image:T -- B:s3
    image2:R -- L:image

    group obs(cloud)[O11Y]
      service loki(server)[Loki] in obs
      service promtail(server)[Promtail] in obs
      service prome(server)[Prometheus] in obs
      service jaeger(server)[Jaeger] in obs
      service graf(server)[Grafana] in obs

      promtail:R -- L:loki
      loki:T -- B:graf
      prome:R -- L:graf
      jaeger:L -- R:graf
    

Observability Stack Component Roles

Component Observability Pillar Primary Role What it “Scrapes/Collects” Collection Method
Promtail Logs Log collection and shipping agent Local Log Files (e.g., /var/log/*.log, stdout/stderr from containers), and systemd Journal Push Model: Tails log files, adds Prometheus-style labels (metadata), and pushes them via HTTP to Loki’s /loki/api/v1/push endpoint
Loki Logs Highly scalable log aggregation backend Log streams (pushed by Promtail) Ingestion/Storage: Stores compressed, unstructured log data, but indexes only the labels (metadata) for cost efficiency
Prometheus Metrics Time-series metrics collection and storage /metrics endpoints exposed by applications or exporters Pull Model (Scraping): Periodically makes an HTTP GET request to a target’s /metrics endpoint to pull the latest state of all metrics
Tempo Traces High-scale, cost-effective distributed tracing backend Trace Spans (e.g., OpenTelemetry, Jaeger, Zipkin formats) from instrumented applications or collectors Push Model: Receives traces via its gRPC or HTTP endpoints from agents (like the OpenTelemetry Collector)

| Jaeger | Traces | A separate, alternative distributed tracing backend | Trace Spans (received from instrumented applications or agents)| Push Model: Receives traces via its own agent/collector endpoints. Tempo can often ingest Jaeger-formatted traces, allowing it to act as a Jaeger replacement.

Generate Large Colorful Test Image

# Option 1: Perlin noise (interesting texture, 10000x10000 = ~300MB uncompressed)
# {:ok, img} = Vix.Vips.Operation.perlin(50_000, 50_000, cell_size: 256)
# {:ok, img} = Vix.Vips.Operation.linear(img, [255], [0])  # Scale to 0-255

# Write to PNG buffer (will compress to ~50-100MB)
# {:ok, bin_png} = Vix.Vips.Image.write_to_buffer(img, ".png")

# IO.puts("Generated image: #{byte_size(bin_png)} bytes (#{Float.round(byte_size(bin_png) / 1_000_000, 2)} MB)")

Import images

:erpc.call(:"client_svc@client_svc.msvc", Email, :create, [1, :welcome])
file = Kino.Input.file("Click below to import an image")
value = Kino.Input.read(file)
path = Kino.Input.file_path(value.file_ref)
bin = File.read!(path)
{mime, h,w,ext} = ExImageInfo.info(bin)

if not(Enum.member?(["image/png", "image/jpeg"], mime)), do: raise "not an image"

bin

Process binary small images (<1MB)

Send small image conversion request every 50ms: Image.convert_bin()

The code will not run if size > 1MB

if (byte_size(bin)> 1_000_000), do: raise "too big"

Stream.interval(50)
|> Stream.take(5)
|> Task.async_stream(
  fn i ->
    :erpc.call(:"client_svc@client_svc.msvc", Image, :convert_bin, [bin, "user#{i}@com"])
    end,
  ordered: false,
  max_concurrency: 10  
)
|> Stream.run()

Send concurrently 100 small image conversion requests to Image.convert_bin()

The code will not run if the size is > 1MB

if (byte_size(bin)> 1_000_000), do: raise "too big"

1..100
|> Enum.to_list()
|> Task.async_stream(
  fn i ->
    :erpc.call(:"client_svc@client_svc.msvc", Image, :convert_bin, [bin, "user#{i}@com"])
    end,
  ordered: false,
  max_concurrency: 10  
)
|> Stream.run()

Upload to S3 via user_svc

# S3 options
s3_opts = [
  object_storage_endpoint: "http://minio:9000",
  access_key_id: "minioadmin",
  secret_access_key: "minioadmin",
  region: "us-east-1"
]
# Generate unique job ID
job_id = "large_test_#{System.system_time(:second)}"


# Upload via RPC to user_svc
{:ok, upload_result} = :erpc.call(
  :"user_svc@user_svc.msvc",
  ReqS3Storage,
  :store,
  [bin, "msvc-images", job_id, "image/png", s3_opts]
)




IO.puts("✅ Uploaded to S3")
IO.puts("   Bucket: #{upload_result.bucket}")
IO.puts("   Key: #{upload_result.key}")
IO.puts("   Size: #{upload_result.size} bytes")
IO.puts("   Presigned URL: #{String.slice(upload_result.presigned_url, 0..80)}...")

upload_result

Send Conversion Request with S3 Reference

Test once:

key = upload_result.key

:erpc.call(:"client_svc@client_svc.msvc", Image, :convert_from_s3, [key, "m@com"])

Send request every 200ms to Image.convert_from_s3()

Stream.interval(100)
|>  Stream.take(10)
|>Task.async_stream(
  fn i ->  
    :erpc.call(
      :"client_svc@client_svc.msvc", 
      Image, 
      :convert_from_s3, 
      [upload_result.key, "m#{i+1}@com"]
    )
  end,
  ordered: false,
  max_concurrency: 10
)
|> Stream.run()

1..200
|>Task.async_stream(
  fn i ->  
    :erpc.call(
      :"client_svc@client_svc.msvc", 
      Image, 
      :convert_from_s3, 
      [upload_result.key, "m#{i}@com"]
    )
  end,
  ordered: false,
  max_concurrency: 10
)
|> Stream.run()

Monitor Conversion (Listen for Response)

# Subscribe to response topic
{:ok, subscription} = :rpc.call(
  :"user_svc@user_svc.msvc",
  Gnat,
  :sub,
  [:gnat, self(), "user.image.converted"]
)

# Wait for response (timeout after 60 seconds for large files)
receive do
  {:msg, %{body: body}} ->
    response = Mcsv.V3.ImageConversionResponse.decode(body)

    IO.puts("\n✅ Conversion Complete!")
    IO.puts("   Success: #{response.success}")
    IO.puts("   Message: #{response.message}")
    IO.puts("   Input Size: #{response.input_size} bytes")
    IO.puts("   Output Size: #{response.output_size} bytes (#{Float.round(response.output_size / 1_000_000, 2)} MB)")
    IO.puts("   PDF URL: #{String.slice(response.pdf_url, 0..80)}...")
    IO.puts("   Job ID: #{response.job_id}")

    response
after
  60_000 ->
    IO.puts("⏱️  Timeout waiting for conversion (60s)")
    :timeout
end

Cleanup: Unsubscribe

:rpc.call(
  :"user_svc@user_svc.msvc",
  Gnat,
  :unsub,
  [:gnat, subscription]
)

Summary

This notebook demonstrates:

  1. Generate a large colorful test image using Vix.Vips
  2. Upload to MinIO/S3 via RPC call to ReqS3Storage.store
  3. Send conversion request with oneof source {:s3_ref, ...}
  4. Stream the image from S3 directly to ImageMagick (memory-efficient!)
  5. Monitor the conversion completion via NATS response

The key advantage: No NATS message size limits - the image stays in S3, only metadata is sent via NATS!