Livebook runner
Mix.install([
{:vix, "~> 0.35.0"},
{:req, "~> 0.5"},
{:kino, "~> 0.17.0"},
{:protobuf, "~> 0.15.0"},
{:ex_image_info, "~> 1.0"}
])
# Set cookie to match your services
Node.set_cookie(:msvc_dev_cookie_change_in_production)
# Node.connect(:"user_svc@user_svc.msvc")
{
Node.connect(:"user_svc@user_svc.msvc"),
Node.connect(:"client_svc@client_svc.msvc"),
Node.connect(:"email_svc@email_svc.msvc"),
Node.connect(:"image_svc_1@image_svc_1.msvc"),
Node.connect(:"image_svc_2@image_svc_2.msvc")
}
Dashboard observables selection
JAEGER TRACES
PROMETHEUS METRICS
MINIO LOCAL S3 STORAGE
GRAFANA DASHBOARDS
Architecture Overview with two Image services
architecture-beta
service lvb(cloud)[LiveBook] in api
group api(cloud)[API]
service nats(internet)[NATS JetStream] in api
service client(internet)[Client] in api
service s3(disk)[S3 MinIO] in api
service user(server)[User Service] in api
service email(internet)[Email Service] in api
service image(disk)[Image1 Service] in api
service image2(disk)[Image2 Service] in api
lvb:R -- L:client
client:T -- B:nats
image:R -- L:nats
email:L -- R:nats
nats:T -- B:user
s3:R -- L:user
image:T -- B:s3
image2:R -- L:image
group obs(cloud)[O11Y]
service loki(server)[Loki] in obs
service promtail(server)[Promtail] in obs
service prome(server)[Prometheus] in obs
service jaeger(server)[Jaeger] in obs
service graf(server)[Grafana] in obs
promtail:R -- L:loki
loki:T -- B:graf
prome:R -- L:graf
jaeger:L -- R:graf
Observability Stack Component Roles
| Component | Observability Pillar | Primary Role | What it “Scrapes/Collects” | Collection Method |
|---|---|---|---|---|
| Promtail | Logs | Log collection and shipping agent | Local Log Files (e.g., /var/log/*.log, stdout/stderr from containers), and systemd Journal | Push Model: Tails log files, adds Prometheus-style labels (metadata), and pushes them via HTTP to Loki’s /loki/api/v1/push endpoint |
| Loki | Logs | Highly scalable log aggregation backend | Log streams (pushed by Promtail) | Ingestion/Storage: Stores compressed, unstructured log data, but indexes only the labels (metadata) for cost efficiency |
| Prometheus | Metrics | Time-series metrics collection and storage | /metrics endpoints exposed by applications or exporters | Pull Model (Scraping): Periodically makes an HTTP GET request to a target’s /metrics endpoint to pull the latest state of all metrics |
| Tempo | Traces | High-scale, cost-effective distributed tracing backend | Trace Spans (e.g., OpenTelemetry, Jaeger, Zipkin formats) from instrumented applications or collectors | Push Model: Receives traces via its gRPC or HTTP endpoints from agents (like the OpenTelemetry Collector) |
| Jaeger | Traces | A separate, alternative distributed tracing backend | Trace Spans (received from instrumented applications or agents)| Push Model: Receives traces via its own agent/collector endpoints. Tempo can often ingest Jaeger-formatted traces, allowing it to act as a Jaeger replacement.
Generate Large Colorful Test Image
# Option 1: Perlin noise (interesting texture, 10000x10000 = ~300MB uncompressed)
# {:ok, img} = Vix.Vips.Operation.perlin(50_000, 50_000, cell_size: 256)
# {:ok, img} = Vix.Vips.Operation.linear(img, [255], [0]) # Scale to 0-255
# Write to PNG buffer (will compress to ~50-100MB)
# {:ok, bin_png} = Vix.Vips.Image.write_to_buffer(img, ".png")
# IO.puts("Generated image: #{byte_size(bin_png)} bytes (#{Float.round(byte_size(bin_png) / 1_000_000, 2)} MB)")
Import images
:erpc.call(:"client_svc@client_svc.msvc", Email, :create, [1, :welcome])
file = Kino.Input.file("Click below to import an image")
value = Kino.Input.read(file)
path = Kino.Input.file_path(value.file_ref)
bin = File.read!(path)
{mime, h,w,ext} = ExImageInfo.info(bin)
if not(Enum.member?(["image/png", "image/jpeg"], mime)), do: raise "not an image"
bin
Process binary small images (<1MB)
Send small image conversion request every 50ms: Image.convert_bin()
The code will not run if size > 1MB
if (byte_size(bin)> 1_000_000), do: raise "too big"
Stream.interval(50)
|> Stream.take(5)
|> Task.async_stream(
fn i ->
:erpc.call(:"client_svc@client_svc.msvc", Image, :convert_bin, [bin, "user#{i}@com"])
end,
ordered: false,
max_concurrency: 10
)
|> Stream.run()
Send concurrently 100 small image conversion requests to Image.convert_bin()
The code will not run if the size is > 1MB
if (byte_size(bin)> 1_000_000), do: raise "too big"
1..100
|> Enum.to_list()
|> Task.async_stream(
fn i ->
:erpc.call(:"client_svc@client_svc.msvc", Image, :convert_bin, [bin, "user#{i}@com"])
end,
ordered: false,
max_concurrency: 10
)
|> Stream.run()
Upload to S3 via user_svc
# S3 options
s3_opts = [
object_storage_endpoint: "http://minio:9000",
access_key_id: "minioadmin",
secret_access_key: "minioadmin",
region: "us-east-1"
]
# Generate unique job ID
job_id = "large_test_#{System.system_time(:second)}"
# Upload via RPC to user_svc
{:ok, upload_result} = :erpc.call(
:"user_svc@user_svc.msvc",
ReqS3Storage,
:store,
[bin, "msvc-images", job_id, "image/png", s3_opts]
)
IO.puts("✅ Uploaded to S3")
IO.puts(" Bucket: #{upload_result.bucket}")
IO.puts(" Key: #{upload_result.key}")
IO.puts(" Size: #{upload_result.size} bytes")
IO.puts(" Presigned URL: #{String.slice(upload_result.presigned_url, 0..80)}...")
upload_result
Send Conversion Request with S3 Reference
Test once:
key = upload_result.key
:erpc.call(:"client_svc@client_svc.msvc", Image, :convert_from_s3, [key, "m@com"])
Send request every 200ms to Image.convert_from_s3()
Stream.interval(100)
|> Stream.take(10)
|>Task.async_stream(
fn i ->
:erpc.call(
:"client_svc@client_svc.msvc",
Image,
:convert_from_s3,
[upload_result.key, "m#{i+1}@com"]
)
end,
ordered: false,
max_concurrency: 10
)
|> Stream.run()
1..200
|>Task.async_stream(
fn i ->
:erpc.call(
:"client_svc@client_svc.msvc",
Image,
:convert_from_s3,
[upload_result.key, "m#{i}@com"]
)
end,
ordered: false,
max_concurrency: 10
)
|> Stream.run()
Monitor Conversion (Listen for Response)
# Subscribe to response topic
{:ok, subscription} = :rpc.call(
:"user_svc@user_svc.msvc",
Gnat,
:sub,
[:gnat, self(), "user.image.converted"]
)
# Wait for response (timeout after 60 seconds for large files)
receive do
{:msg, %{body: body}} ->
response = Mcsv.V3.ImageConversionResponse.decode(body)
IO.puts("\n✅ Conversion Complete!")
IO.puts(" Success: #{response.success}")
IO.puts(" Message: #{response.message}")
IO.puts(" Input Size: #{response.input_size} bytes")
IO.puts(" Output Size: #{response.output_size} bytes (#{Float.round(response.output_size / 1_000_000, 2)} MB)")
IO.puts(" PDF URL: #{String.slice(response.pdf_url, 0..80)}...")
IO.puts(" Job ID: #{response.job_id}")
response
after
60_000 ->
IO.puts("⏱️ Timeout waiting for conversion (60s)")
:timeout
end
Cleanup: Unsubscribe
:rpc.call(
:"user_svc@user_svc.msvc",
Gnat,
:unsub,
[:gnat, subscription]
)
Summary
This notebook demonstrates:
- Generate a large colorful test image using Vix.Vips
-
Upload to MinIO/S3 via RPC call to
ReqS3Storage.store -
Send conversion request with
oneof source {:s3_ref, ...} - Stream the image from S3 directly to ImageMagick (memory-efficient!)
- Monitor the conversion completion via NATS response
The key advantage: No NATS message size limits - the image stays in S3, only metadata is sent via NATS!