Powered by AppSignal & Oban Pro

Subscriptions API

docs/api/subscriptions.livemd

Subscriptions API

Mix.install([
  {:req, "~> 0.4"},
  {:jason, "~> 1.4"}
])

Introduction

The Subscriptions API allows you to manage podcast subscriptions for authenticated users. This Livebook demonstrates:

  • Subscribe to podcasts
  • List all subscriptions
  • Unsubscribe from podcasts
  • Understanding the CQRS/Event Sourcing flow

Setup

# API Configuration
base_url = "http://localhost:4000"
api_base = "#{base_url}/api/v1"

# Your JWT token (get this from the authentication.livemd)
token = System.get_env("BALADOS_JWT") || "paste_your_jwt_token_here"

# Create authenticated client
client = Req.new(
  base_url: api_base,
  headers: [{"authorization", "Bearer #{token}"}]
)

IO.puts("βœ“ Client configured")

Understanding RSS Source Encoding

Balados Sync uses base64-encoded RSS feed URLs to uniquely identify podcasts:

defmodule RSSEncoder do
  def encode_feed_url(url) do
    Base.encode64(url)
  end

  def decode_feed_url(encoded) do
    Base.decode64!(encoded)
  end

  def encode_episode(guid, enclosure_url) do
    Base.encode64("#{guid},#{enclosure_url}")
  end
end

# Example podcast feed
example_feed = "https://feeds.example.com/my-podcast-feed.xml"
encoded_feed = RSSEncoder.encode_feed_url(example_feed)

IO.puts("Original URL: #{example_feed}")
IO.puts("Encoded (rss_source_feed): #{encoded_feed}")
IO.puts("Decoded: #{RSSEncoder.decode_feed_url(encoded_feed)}")

List All Subscriptions

# GET /api/v1/subscriptions
response = Req.get!(client, "/subscriptions")

IO.inspect(response.status, label: "Status")
IO.inspect(response.body, label: "Subscriptions")

# Parse and display nicely
subscriptions = response.body

IO.puts("\nπŸ“š Your Subscriptions (#{length(subscriptions)}):")
Enum.each(subscriptions, fn sub ->
  feed_url = RSSEncoder.decode_feed_url(sub["rss_source_feed"])
  IO.puts("  β€’ #{sub["rss_source_id"]}")
  IO.puts("    Feed: #{feed_url}")
  IO.puts("    Subscribed: #{sub["subscribed_at"]}")
  IO.puts("")
end)

Subscribe to a Podcast

# Example podcast to subscribe to
podcast_feed = "https://feeds.transistor.fm/changelog-podcasts"
podcast_id = "changelog-podcasts"

# Encode the feed URL
rss_source_feed = RSSEncoder.encode_feed_url(podcast_feed)

# POST /api/v1/subscriptions
subscription_request = %{
  rss_source_feed: rss_source_feed,
  rss_source_id: podcast_id
}

subscribe_response = Req.post!(client, "/subscriptions",
  json: subscription_request
)

IO.inspect(subscribe_response.status, label: "Status")
IO.inspect(subscribe_response.body, label: "Subscription Created")

case subscribe_response.status do
  200 -> IO.puts("βœ“ Successfully subscribed to #{podcast_id}!")
  201 -> IO.puts("βœ“ Successfully subscribed to #{podcast_id}!")
  409 -> IO.puts("β„Ή Already subscribed to #{podcast_id}")
  _ -> IO.puts("βœ— Failed to subscribe")
end

Subscribe to Multiple Podcasts

# List of podcasts to subscribe to
podcasts = [
  %{url: "https://feeds.megaphone.fm/syntax", id: "syntax-fm"},
  %{url: "https://feeds.simplecast.com/54nAGcIl", id: "the-changelog"},
  %{url: "https://elixiroutlaws.com/feed.xml", id: "elixir-outlaws"}
]

# Subscribe to each
results = Enum.map(podcasts, fn podcast ->
  encoded_feed = RSSEncoder.encode_feed_url(podcast.url)

  response = Req.post(client, "/subscriptions",
    json: %{
      rss_source_feed: encoded_feed,
      rss_source_id: podcast.id
    }
  )

  case response do
    {:ok, %{status: status}} when status in [200, 201] ->
      %{podcast: podcast.id, status: :subscribed}
    {:ok, %{status: 409}} ->
      %{podcast: podcast.id, status: :already_subscribed}
    {:ok, %{status: status}} ->
      %{podcast: podcast.id, status: {:error, status}}
    {:error, reason} ->
      %{podcast: podcast.id, status: {:error, reason}}
  end
end)

IO.inspect(results, label: "Subscription Results")

# Summary
successful = Enum.count(results, fn r -> r.status == :subscribed end)
already = Enum.count(results, fn r -> r.status == :already_subscribed end)
failed = Enum.count(results, fn r -> match?({:error, _}, r.status) end)

IO.puts("\nπŸ“Š Summary:")
IO.puts("  βœ“ New subscriptions: #{successful}")
IO.puts("  β„Ή Already subscribed: #{already}")
IO.puts("  βœ— Failed: #{failed}")

Get a Specific Subscription

# Find a subscription by podcast ID
podcast_id_to_find = "syntax-fm"

all_subs = Req.get!(client, "/subscriptions").body
subscription = Enum.find(all_subs, fn sub ->
  sub["rss_source_id"] == podcast_id_to_find
end)

case subscription do
  nil ->
    IO.puts("βœ— Not subscribed to #{podcast_id_to_find}")

  sub ->
    IO.puts("βœ“ Found subscription:")
    IO.inspect(sub, pretty: true)

    # Decode the feed URL
    feed_url = RSSEncoder.decode_feed_url(sub["rss_source_feed"])
    IO.puts("\nFeed URL: #{feed_url}")
end

Unsubscribe from a Podcast

# Podcast to unsubscribe from
podcast_to_remove = "elixir-outlaws"

# First, find the subscription
all_subscriptions = Req.get!(client, "/subscriptions").body
subscription_to_remove = Enum.find(all_subscriptions, fn sub ->
  sub["rss_source_id"] == podcast_to_remove
end)

case subscription_to_remove do
  nil ->
    IO.puts("β„Ή Not subscribed to #{podcast_to_remove}")

  sub ->
    # Get the encoded feed (used as ID)
    rss_source_feed = sub["rss_source_feed"]

    # DELETE /api/v1/subscriptions/:rss_source_feed
    delete_response = Req.delete!(client, "/subscriptions/#{rss_source_feed}")

    IO.inspect(delete_response.status, label: "Status")

    case delete_response.status do
      204 -> IO.puts("βœ“ Successfully unsubscribed from #{podcast_to_remove}")
      200 -> IO.puts("βœ“ Successfully unsubscribed from #{podcast_to_remove}")
      404 -> IO.puts("βœ— Subscription not found")
      _ -> IO.puts("βœ— Failed to unsubscribe")
    end
end

Check Subscription Status

# Check if subscribed to a specific podcast
defmodule SubscriptionChecker do
  def is_subscribed?(client, podcast_id) do
    all_subs = Req.get!(client, "/subscriptions").body
    Enum.any?(all_subs, fn sub -> sub["rss_source_id"] == podcast_id end)
  end

  def subscription_count(client) do
    Req.get!(client, "/subscriptions").body |> length()
  end
end

podcasts_to_check = ["syntax-fm", "the-changelog", "some-random-podcast"]

IO.puts("πŸ“‹ Subscription Status:")
Enum.each(podcasts_to_check, fn podcast_id ->
  status = if SubscriptionChecker.is_subscribed?(client, podcast_id) do
    "βœ“ Subscribed"
  else
    "βœ— Not subscribed"
  end

  IO.puts("  #{podcast_id}: #{status}")
end)

IO.puts("\nTotal subscriptions: #{SubscriptionChecker.subscription_count(client)}")

Understanding Event Sourcing

When you subscribe to a podcast, here’s what happens behind the scenes:

IO.puts("""
CQRS/Event Sourcing Flow:

1. POST /api/v1/subscriptions
   ↓
2. Controller creates Subscribe command
   ↓
3. Command dispatched to User aggregate
   ↓
4. Aggregate validates and emits UserSubscribed event
   ↓
5. Event persisted to EventStore (immutable)
   ↓
6. SubscriptionProjector handles event
   ↓
7. Read model updated in PostgreSQL
   ↓
8. Controller queries projection and returns response

This means:
β€’ Every subscription is an immutable event
β€’ Events can be replayed to rebuild state
β€’ Read models are eventually consistent
β€’ Audit trail is preserved forever
""")

Advanced: Subscription with Custom Device Info

The JWT token contains device information, but you can track which device performed the subscription:

# The device_id and device_name from your JWT are automatically used
# You can generate different tokens for different devices

IO.puts("""
Device Tracking:

Each subscription is associated with the device that created it.
This is determined by the device_id and device_name in your JWT token.

To track subscriptions per device:
1. Generate separate JWT tokens for each device
2. Use the appropriate token when subscribing
3. Query subscriptions to see which device subscribed
""")

# Display device info from current subscriptions
subscriptions = Req.get!(client, "/subscriptions").body

if length(subscriptions) > 0 do
  IO.puts("\nπŸ“± Device Information:")
  subscriptions
  |> Enum.take(3)
  |> Enum.each(fn sub ->
    IO.puts("  #{sub["rss_source_id"]}")
    IO.puts("    Device: #{sub["device_name"]} (#{sub["device_id"]})")
  end)
end

Testing Idempotency

Subscribing to the same podcast multiple times should be idempotent:

# Subscribe to a podcast multiple times
test_podcast = "https://example.com/test-feed.xml"
test_id = "test-podcast"

encoded = RSSEncoder.encode_feed_url(test_podcast)

IO.puts("Testing idempotency by subscribing 3 times:")

results = for i <- 1..3 do
  response = Req.post(client, "/subscriptions",
    json: %{
      rss_source_feed: encoded,
      rss_source_id: test_id
    }
  )

  case response do
    {:ok, %{status: status}} ->
      IO.puts("  Attempt #{i}: Status #{status}")
      status
    {:error, _} ->
      IO.puts("  Attempt #{i}: Error")
      :error
  end
end

IO.puts("\nβœ“ Idempotency test complete")
IO.puts("First request should create (200/201), subsequent should return existing (200/409)")

Cleanup

# Unsubscribe from all test subscriptions
test_podcasts = ["test-podcast", "syntax-fm", "the-changelog"]

IO.puts("Cleaning up test subscriptions:")

all_subs = Req.get!(client, "/subscriptions").body

test_podcasts
|> Enum.each(fn podcast_id ->
  sub = Enum.find(all_subs, fn s -> s["rss_source_id"] == podcast_id end)

  case sub do
    nil ->
      IO.puts("  #{podcast_id}: Not subscribed")

    %{"rss_source_feed" => feed} ->
      Req.delete(client, "/subscriptions/#{feed}")
      IO.puts("  #{podcast_id}: Unsubscribed")
  end
end)

IO.puts("\nβœ“ Cleanup complete")

Next Steps

Common Issues

401 Unauthorized

  • Check your JWT token is valid and not expired
  • Ensure the Authorization header is set correctly
  • Verify token format: Bearer

400 Bad Request

  • Verify RSS feed URL is properly base64-encoded
  • Check that rss_source_id is provided
  • Ensure request body is valid JSON

Subscription not appearing

  • Remember: eventual consistency
  • Wait a moment and retry GET /subscriptions
  • Check event store for UserSubscribed events

Cannot unsubscribe

  • Verify the rss_source_feed (base64-encoded URL) is correct
  • Check you’re using DELETE with the encoded feed as ID
  • Ensure you’re actually subscribed first