Powered by AppSignal & Oban Pro

Choreo Dataflow: Comprehensive Walkthrough

livebooks/dataflow_walkthrough.livemd

Choreo Dataflow: Comprehensive Walkthrough

Section

Mix.install([
  {:choreo, "~> 0.6"},
  {:kino_vizjs, "~> 0.5.0"}
])

> Rendering diagrams: This livebook uses Kino.VizJS to render DOT diagrams inline. You can also copy DOT output into PlantText or run dot -Tpng diagram.dot -o diagram.png locally.


What is Choreo.Dataflow?

Choreo.Dataflow models stream-processing and ETL pipelines as directed graphs. Unlike static diagramming tools, you define your pipeline as code and then ask it questions:

  • “Where are the bottlenecks?”
  • “What’s the slowest path end-to-end?”
  • “Will this topology create backpressure?”
  • “Are there orphan nodes that never emit or receive data?”

Nodes are typed: sources, transforms, buffers, conditionals, merges, and sinks. Edges carry data-type annotations and can represent normal, error, retry, or dead-letter paths.


Node Types & Shapes

Type Shape Purpose
source 🏠 house Entry point — produces data
transform 📦 box3d Processing step — maps, filters, enriches
buffer 🛢️ cylinder Queue, cache, or stream broker
conditional 💎 diamond Branching logic — routes to one of many outputs
merge ⏩ trapezium Combines multiple streams into one
sink 🏁 invhouse Terminal — writes to DB, file, API, etc.
alias Choreo.Dataflow
alias Choreo.Dataflow.Analysis

legend =
  Dataflow.new()
  |> Dataflow.add_source(:source, label: "Source")
  |> Dataflow.add_transform(:transform, label: "Transform")
  |> Dataflow.add_buffer(:buffer, label: "Buffer")
  |> Dataflow.add_conditional(:conditional, label: "Conditional")
  |> Dataflow.add_merge(:merge, label: "Merge")
  |> Dataflow.add_sink(:sink, label: "Sink")

Kino.VizJS.render(Dataflow.to_dot(legend))

Example 1: Simple ETL Pipeline

A classic extract-transform-load flow: ingest logs, parse JSON, buffer through Kafka, and write to TimescaleDB.

etl =
  Dataflow.new()
  |> Dataflow.add_source(:log_shipper, label: "Log Shipper\n10_000 evt/s", rate: 10_000)
  |> Dataflow.add_transform(:json_parser, label: "JSON Parser", latency_ms: 25)
  |> Dataflow.add_buffer(:kafka, label: "Kafka\n(cap: 50_000)", capacity: 50_000)
  |> Dataflow.add_transform(:enricher, label: "Enricher", latency_ms: 80)
  |> Dataflow.add_sink(:timescaledb, label: "TimescaleDB")
  |> Dataflow.connect(:log_shipper, :json_parser, data_type: "raw logs")
  |> Dataflow.connect(:json_parser, :kafka, data_type: "parsed event")
  |> Dataflow.connect(:kafka, :enricher, data_type: "event")
  |> Dataflow.connect(:enricher, :timescaledb, data_type: "enriched row")

Kino.VizJS.render(Dataflow.to_dot(etl))

Basic Analysis

IO.inspect(Analysis.cyclic?(etl), label: "Cyclic?")
IO.inspect(Analysis.orphan_nodes(etl), label: "Orphan nodes")
IO.inspect(Analysis.topological_sort(etl), label: "Topological order")

No cycles, no orphans, clean linear pipeline. But is it fast enough?


Example 2: IoT Pipeline with Backpressure

Real-world streams have mismatched speeds. A sensor bursts data faster than the parser can handle. Let’s model that and find the bottleneck.

iot =
  Dataflow.new()
  |> Dataflow.add_source(:sensor_a, label: "Sensor A\n2_000 evt/s", rate: 2_000)
  |> Dataflow.add_source(:sensor_b, label: "Sensor B\n3_000 evt/s", rate: 3_000)
  |> Dataflow.add_source(:sensor_c, label: "Sensor C\n5_000 evt/s", rate: 5_000)
  |> Dataflow.add_merge(:mux, label: "Merge")
  |> Dataflow.add_transform(:parser, label: "Parser\n50ms", latency_ms: 50)
  |> Dataflow.add_buffer(:rabbitmq, label: "RabbitMQ\n(cap: 8_000)", capacity: 8_000)
  |> Dataflow.add_transform(:aggregator, label: "Window Aggregator\n200ms", latency_ms: 200)
  |> Dataflow.add_sink(:influxdb, label: "InfluxDB")
  |> Dataflow.connect(:sensor_a, :mux)
  |> Dataflow.connect(:sensor_b, :mux)
  |> Dataflow.connect(:sensor_c, :mux)
  |> Dataflow.connect(:mux, :parser)
  |> Dataflow.connect(:parser, :rabbitmq)
  |> Dataflow.connect(:rabbitmq, :aggregator)
  |> Dataflow.connect(:aggregator, :influxdb)

Kino.VizJS.render(Dataflow.to_dot(iot))

Bottleneck Detection

Analysis.bottlenecks(iot)
|> IO.inspect(label: "Bottlenecks")

The buffer with the smallest headroom relative to upstream throughput surfaces first. If RabbitMQ appears here, you know the parser or aggregator is slower than the combined sensor rate.

Throughput Simulation

Analysis.simulate(iot)
|> Enum.each(fn {id, stats} ->
  IO.puts("#{id}: Inbound #{stats.in_rate} | Outbound #{stats.out_rate} evt/s")
end)

simulate/1 propagates rates forward and computes effective throughput at every stage. If a transform has lower capacity than its input, the downstream rate drops — that’s backpressure in action.

Critical Path

{:ok, path, latency_ms} = Analysis.longest_path(iot)

IO.puts("Slowest path: #{Enum.join(path, " → ")}")
IO.puts("End-to-end latency: #{latency_ms}ms")

The longest path tells you the minimum latency any single event will experience from source to sink. If this exceeds your SLA, you need to parallelise or optimise the slowest transform.


Example 3: Error Handling & Dead-Letter Queues

Production pipelines don’t just have happy paths. Failed parses need retries; poison pills need quarantine.

resilient =
  Dataflow.new()
  |> Dataflow.add_source(:webhook, label: "Stripe Webhook\n500 req/s", rate: 500)
  |> Dataflow.add_transform(:validator, label: "Signature Validator\n10ms", latency_ms: 10)
  |> Dataflow.add_conditional(:router, label: "Valid?")
  |> Dataflow.add_transform(:processor, label: "Event Processor\n100ms", latency_ms: 100)
  |> Dataflow.add_buffer(:retry_queue, label: "Retry Queue\n(max: 3)", capacity: 1_000)
  |> Dataflow.add_transform(:retry_handler, label: "Retry Handler\n150ms", latency_ms: 150)
  |> Dataflow.add_buffer(:dlq, label: "Dead Letter Queue", capacity: 10_000)
  |> Dataflow.add_sink(:postgres, label: "Postgres")
  |> Dataflow.add_sink(:alerting, label: "PagerDuty")
  # Happy path
  |> Dataflow.connect(:webhook, :validator, data_type: "raw payload")
  |> Dataflow.connect(:validator, :router, data_type: "validated payload")
  |> Dataflow.connect(:router, :processor, data_type: "yes", edge_type: :normal)
  |> Dataflow.connect(:processor, :postgres, data_type: "processed event")
  # Retry loop
  |> Dataflow.connect(:router, :retry_queue, data_type: "no", edge_type: :retry)
  |> Dataflow.connect(:retry_queue, :retry_handler)
  |> Dataflow.connect(:retry_handler, :router, data_type: "reattempt", edge_type: :retry)
  # Dead letter after retries exhausted
  |> Dataflow.connect(:retry_handler, :dlq, data_type: "failed", edge_type: :dead_letter)
  |> Dataflow.connect(:dlq, :alerting, data_type: "alert", edge_type: :error)

Kino.VizJS.render(Dataflow.to_dot(resilient))

Notice the edge styling:

  • Solid = normal flow
  • Dashed red = error path
  • Dashed orange = retry loop
  • Dotted purple = dead-letter

Detecting Cycles

Retry loops are intentional cycles. Let’s verify the pipeline isn’t broken elsewhere:

Analysis.cyclic?(resilient)
|> IO.inspect(label: "Has cycles?")

cyclic?/1 returns true here — but that’s expected. The retry loop is a designed cycle, not a bug. Use Analysis.cycles/1 (if available) or inspect the cycle paths to distinguish intentional loops from accidental ones.


Example 4: Multi-Stage Microservices with Sub-Pipelines

Complex systems have bounded contexts. Sub-pipeline clusters let you group related nodes visually while keeping the full graph connected for analysis.

microservices =
  Dataflow.new()
  # Ingestion cluster
  |> Dataflow.add_source(:mobile_app, label: "Mobile App\n1_200 req/s", rate: 1_200)
  |> Dataflow.add_source(:web_app, label: "Web App\n800 req/s", rate: 800)
  |> Dataflow.add_merge(:api_gateway, label: "API Gateway")
  |> Dataflow.add_transform(:auth, label: "Auth Middleware\n15ms", latency_ms: 15)
  # Order service cluster
  |> Dataflow.add_cluster("order_service", label: "Order Service", fillcolor: "#eff6ff")
  |> Dataflow.add_transform(:order_handler, label: "Order Handler\n80ms", latency_ms: 80, cluster: "order_service")
  |> Dataflow.add_buffer(:order_kafka, label: "Order Events", capacity: 20_000, cluster: "order_service")
  |> Dataflow.add_transform(:inventory_check, label: "Inventory Check\n60ms", latency_ms: 60, cluster: "order_service")
  |> Dataflow.add_conditional(:in_stock, label: "In Stock?", cluster: "order_service")
  |> Dataflow.add_transform(:payment, label: "Payment Service\n250ms", latency_ms: 250, cluster: "order_service")
  |> Dataflow.add_sink(:order_db, label: "Order DB", cluster: "order_service")
  # Notification cluster
  |> Dataflow.add_cluster("notifications", label: "Notifications", fillcolor: "#fdf2f8")
  |> Dataflow.add_transform(:email_worker, label: "Email Worker\n120ms", latency_ms: 120, cluster: "notifications")
  |> Dataflow.add_transform(:push_worker, label: "Push Worker\n40ms", latency_ms: 40, cluster: "notifications")
  |> Dataflow.add_merge(:notification_router, label: "Notify Router", cluster: "notifications")
  |> Dataflow.add_sink(:sendgrid, label: "SendGrid", cluster: "notifications")
  |> Dataflow.add_sink(:firebase, label: "Firebase FCM", cluster: "notifications")
  # Cross-cluster flows
  |> Dataflow.connect(:mobile_app, :api_gateway, data_type: "HTTP")
  |> Dataflow.connect(:web_app, :api_gateway, data_type: "HTTP")
  |> Dataflow.connect(:api_gateway, :auth, data_type: "request")
  |> Dataflow.connect(:auth, :order_handler, data_type: "authed request")
  |> Dataflow.connect(:order_handler, :order_kafka)
  |> Dataflow.connect(:order_kafka, :inventory_check)
  |> Dataflow.connect(:inventory_check, :in_stock)
  |> Dataflow.connect(:in_stock, :payment, data_type: "yes", edge_type: :normal)
  |> Dataflow.connect(:in_stock, :order_db, data_type: "no", edge_type: :normal)
  |> Dataflow.connect(:payment, :order_db)
  |> Dataflow.connect(:order_db, :email_worker, data_type: "order confirmed")
  |> Dataflow.connect(:order_db, :push_worker, data_type: "order confirmed")
  |> Dataflow.connect(:email_worker, :notification_router)
  |> Dataflow.connect(:push_worker, :notification_router)
  |> Dataflow.connect(:notification_router, :sendgrid)
  |> Dataflow.connect(:notification_router, :firebase)

Kino.VizJS.render(Dataflow.to_dot(microservices))

Cluster-Aware Analysis

Clusters are visual only — the underlying graph is still one connected structure, so analysis works across boundaries:

IO.inspect(Analysis.bottlenecks(microservices), label: "Bottlenecks")
IO.inspect(Analysis.orphan_nodes(microservices), label: "Orphans")

{:ok, path, latency} = Analysis.longest_path(microservices)
IO.puts("Critical path (#{latency}ms): #{Enum.join(path, " → ")}")

Per-Cluster Bottlenecks

You can also inspect simulation results for specific clusters by filtering on node IDs:

order_nodes = [:order_handler, :order_kafka, :inventory_check, :in_stock, :payment, :order_db]

Analysis.simulate(microservices)
|> Enum.filter(fn {id, _} -> id in order_nodes end)
|> Enum.each(fn {id, stats} ->
  IO.puts("  #{id}: Inbound #{stats.in_rate} | Outbound #{stats.out_rate} evt/s")
end)

Example 5: Fixing a Broken Pipeline

Start with a pipeline that has structural issues and use analysis as a diagnostic checklist.

broken =
  Dataflow.new()
  |> Dataflow.add_source(:ingress, label: "Ingress")
  |> Dataflow.add_transform(:worker, label: "Worker")
  |> Dataflow.add_transform(:orphan_processor, label: "Orphan")
  |> Dataflow.add_sink(:db, label: "DB")
  |> Dataflow.add_sink(:never_reached, label: "Unreachable Sink")
  |> Dataflow.connect(:ingress, :worker)
  # worker -> db is forgotten!
  # orphan_processor has no inputs
  # never_reached has no inputs

Kino.VizJS.render(Dataflow.to_dot(broken))

Diagnosis

IO.inspect(Analysis.orphan_nodes(broken), label: "Orphans (no incoming edges)")
IO.inspect(Analysis.dead_ends(broken), label: "Dead ends (no outgoing edges)")
IO.inspect(Analysis.topological_sort(broken), label: "Topological sort")

The orphan processor and unreachable sink are immediately flagged. The missing worker → db connection means the database sink is also orphaned.

The Fix

fixed =
  Dataflow.new()
  |> Dataflow.add_source(:ingress, label: "Ingress")
  |> Dataflow.add_transform(:worker, label: "Worker")
  |> Dataflow.add_sink(:db, label: "DB")
  |> Dataflow.connect(:ingress, :worker)
  |> Dataflow.connect(:worker, :db)

IO.inspect(Analysis.orphan_nodes(fixed), label: "Orphans after fix")
IO.inspect(Analysis.dead_ends(fixed), label: "Dead ends after fix")
Kino.VizJS.render(Dataflow.to_dot(fixed))

Advanced: Custom Theming

Build a theme that matches your organisation’s brand colours.

brand_theme =
  Choreo.Theme.custom(
    colors: %{
      source: "#10b981",
      transform: "#3b82f6",
      buffer: "#f59e0b",
      conditional: "#8b5cf6",
      merge: "#ec4899",
      sink: "#ef4444"
    },
    node_fontcolor: "white",
    edge_color: "#94a3b8",
    graph_bgcolor: "#0f172a"
  )

Kino.VizJS.render(Dataflow.to_dot(microservices, theme: brand_theme))

Summary

Question Function
“Will it deadlock?” Analysis.cyclic?/1
“What’s the execution order?” Analysis.topological_sort/1
“Are there dangling nodes?” Analysis.orphan_nodes/1
“Where will it choke?” Analysis.bottlenecks/1
“How fast can it go?” Analysis.simulate/1
“What’s the slowest path?” Analysis.longest_path/1
“Group related nodes visually” Dataflow.add_cluster/3 + cluster: "name" option on nodes
“Render to DOT” Dataflow.to_dot/2

Dataflow diagrams as code mean your pipeline architecture is version-controlled, reviewable, and analyzable. Every time you add a transform or buffer, you can immediately see the impact on latency, throughput, and bottlenecks — before you deploy.