Powered by AppSignal & Oban Pro

Subscriptions API

docs/api/subscriptions.livemd

Subscriptions API

Mix.install([
  {:req, "~> 0.4"},
  {:jason, "~> 1.4"}
])

Introduction

The Subscriptions API allows you to manage podcast subscriptions for authenticated users. This Livebook demonstrates:

  • Subscribe to podcasts
  • List all subscriptions
  • Unsubscribe from podcasts
  • Understanding the CQRS/Event Sourcing flow

Setup

# API Configuration
base_url = "http://localhost:4000"
api_base = "#{base_url}/api/v1"

# Your JWT token (get this from the authentication.livemd)
token = System.get_env("BALADOS_JWT") || "paste_your_jwt_token_here"

# Create authenticated client
client = Req.new(
  base_url: api_base,
  headers: [{"authorization", "Bearer #{token}"}]
)

IO.puts("โœ“ Client configured")

Understanding RSS Source Encoding

Balados Sync uses base64-encoded RSS feed URLs to uniquely identify podcasts:

defmodule RSSEncoder do
  def encode_feed_url(url) do
    Base.encode64(url)
  end

  def decode_feed_url(encoded) do
    Base.decode64!(encoded)
  end

  def encode_episode(guid, enclosure_url) do
    Base.encode64("#{guid},#{enclosure_url}")
  end
end

# Example podcast feed
example_feed = "https://feeds.example.com/my-podcast-feed.xml"
encoded_feed = RSSEncoder.encode_feed_url(example_feed)

IO.puts("Original URL: #{example_feed}")
IO.puts("Encoded (rss_source_feed): #{encoded_feed}")
IO.puts("Decoded: #{RSSEncoder.decode_feed_url(encoded_feed)}")

List All Subscriptions

# GET /api/v1/subscriptions
response = Req.get!(client, "/subscriptions")

IO.inspect(response.status, label: "Status")
IO.inspect(response.body, label: "Subscriptions")

# Parse and display nicely
subscriptions = response.body

IO.puts("\n๐Ÿ“š Your Subscriptions (#{length(subscriptions)}):")
Enum.each(subscriptions, fn sub ->
  feed_url = RSSEncoder.decode_feed_url(sub["rss_source_feed"])
  IO.puts("  โ€ข #{sub["rss_source_id"]}")
  IO.puts("    Feed: #{feed_url}")
  IO.puts("    Subscribed: #{sub["subscribed_at"]}")
  IO.puts("")
end)

Subscribe to a Podcast

# Example podcast to subscribe to
podcast_feed = "https://feeds.transistor.fm/changelog-podcasts"
podcast_id = "changelog-podcasts"

# Encode the feed URL
rss_source_feed = RSSEncoder.encode_feed_url(podcast_feed)

# POST /api/v1/subscriptions
subscription_request = %{
  rss_source_feed: rss_source_feed,
  rss_source_id: podcast_id
}

subscribe_response = Req.post!(client, "/subscriptions",
  json: subscription_request
)

IO.inspect(subscribe_response.status, label: "Status")
IO.inspect(subscribe_response.body, label: "Subscription Created")

case subscribe_response.status do
  200 -> IO.puts("โœ“ Successfully subscribed to #{podcast_id}!")
  201 -> IO.puts("โœ“ Successfully subscribed to #{podcast_id}!")
  409 -> IO.puts("โ„น Already subscribed to #{podcast_id}")
  _ -> IO.puts("โœ— Failed to subscribe")
end

Subscribe to Multiple Podcasts

# List of podcasts to subscribe to
podcasts = [
  %{url: "https://feeds.megaphone.fm/syntax", id: "syntax-fm"},
  %{url: "https://feeds.simplecast.com/54nAGcIl", id: "the-changelog"},
  %{url: "https://elixiroutlaws.com/feed.xml", id: "elixir-outlaws"}
]

# Subscribe to each
results = Enum.map(podcasts, fn podcast ->
  encoded_feed = RSSEncoder.encode_feed_url(podcast.url)

  response = Req.post(client, "/subscriptions",
    json: %{
      rss_source_feed: encoded_feed,
      rss_source_id: podcast.id
    }
  )

  case response do
    {:ok, %{status: status}} when status in [200, 201] ->
      %{podcast: podcast.id, status: :subscribed}
    {:ok, %{status: 409}} ->
      %{podcast: podcast.id, status: :already_subscribed}
    {:ok, %{status: status}} ->
      %{podcast: podcast.id, status: {:error, status}}
    {:error, reason} ->
      %{podcast: podcast.id, status: {:error, reason}}
  end
end)

IO.inspect(results, label: "Subscription Results")

# Summary
successful = Enum.count(results, fn r -> r.status == :subscribed end)
already = Enum.count(results, fn r -> r.status == :already_subscribed end)
failed = Enum.count(results, fn r -> match?({:error, _}, r.status) end)

IO.puts("\n๐Ÿ“Š Summary:")
IO.puts("  โœ“ New subscriptions: #{successful}")
IO.puts("  โ„น Already subscribed: #{already}")
IO.puts("  โœ— Failed: #{failed}")

Get a Specific Subscription

# Find a subscription by podcast ID
podcast_id_to_find = "syntax-fm"

all_subs = Req.get!(client, "/subscriptions").body
subscription = Enum.find(all_subs, fn sub ->
  sub["rss_source_id"] == podcast_id_to_find
end)

case subscription do
  nil ->
    IO.puts("โœ— Not subscribed to #{podcast_id_to_find}")

  sub ->
    IO.puts("โœ“ Found subscription:")
    IO.inspect(sub, pretty: true)

    # Decode the feed URL
    feed_url = RSSEncoder.decode_feed_url(sub["rss_source_feed"])
    IO.puts("\nFeed URL: #{feed_url}")
end

Unsubscribe from a Podcast

# Podcast to unsubscribe from
podcast_to_remove = "elixir-outlaws"

# First, find the subscription
all_subscriptions = Req.get!(client, "/subscriptions").body
subscription_to_remove = Enum.find(all_subscriptions, fn sub ->
  sub["rss_source_id"] == podcast_to_remove
end)

case subscription_to_remove do
  nil ->
    IO.puts("โ„น Not subscribed to #{podcast_to_remove}")

  sub ->
    # Get the encoded feed (used as ID)
    rss_source_feed = sub["rss_source_feed"]

    # DELETE /api/v1/subscriptions/:rss_source_feed
    delete_response = Req.delete!(client, "/subscriptions/#{rss_source_feed}")

    IO.inspect(delete_response.status, label: "Status")

    case delete_response.status do
      204 -> IO.puts("โœ“ Successfully unsubscribed from #{podcast_to_remove}")
      200 -> IO.puts("โœ“ Successfully unsubscribed from #{podcast_to_remove}")
      404 -> IO.puts("โœ— Subscription not found")
      _ -> IO.puts("โœ— Failed to unsubscribe")
    end
end

Check Subscription Status

# Check if subscribed to a specific podcast
defmodule SubscriptionChecker do
  def is_subscribed?(client, podcast_id) do
    all_subs = Req.get!(client, "/subscriptions").body
    Enum.any?(all_subs, fn sub -> sub["rss_source_id"] == podcast_id end)
  end

  def subscription_count(client) do
    Req.get!(client, "/subscriptions").body |> length()
  end
end

podcasts_to_check = ["syntax-fm", "the-changelog", "some-random-podcast"]

IO.puts("๐Ÿ“‹ Subscription Status:")
Enum.each(podcasts_to_check, fn podcast_id ->
  status = if SubscriptionChecker.is_subscribed?(client, podcast_id) do
    "โœ“ Subscribed"
  else
    "โœ— Not subscribed"
  end

  IO.puts("  #{podcast_id}: #{status}")
end)

IO.puts("\nTotal subscriptions: #{SubscriptionChecker.subscription_count(client)}")

Understanding Event Sourcing

When you subscribe to a podcast, hereโ€™s what happens behind the scenes:

IO.puts("""
CQRS/Event Sourcing Flow:

1. POST /api/v1/subscriptions
   โ†“
2. Controller creates Subscribe command
   โ†“
3. Command dispatched to User aggregate
   โ†“
4. Aggregate validates and emits UserSubscribed event
   โ†“
5. Event persisted to EventStore (immutable)
   โ†“
6. SubscriptionProjector handles event
   โ†“
7. Read model updated in PostgreSQL
   โ†“
8. Controller queries projection and returns response

This means:
โ€ข Every subscription is an immutable event
โ€ข Events can be replayed to rebuild state
โ€ข Read models are eventually consistent
โ€ข Audit trail is preserved forever
""")

Advanced: Subscription with Custom Device Info

The JWT token contains device information, but you can track which device performed the subscription:

# The device_id and device_name from your JWT are automatically used
# You can generate different tokens for different devices

IO.puts("""
Device Tracking:

Each subscription is associated with the device that created it.
This is determined by the device_id and device_name in your JWT token.

To track subscriptions per device:
1. Generate separate JWT tokens for each device
2. Use the appropriate token when subscribing
3. Query subscriptions to see which device subscribed
""")

# Display device info from current subscriptions
subscriptions = Req.get!(client, "/subscriptions").body

if length(subscriptions) > 0 do
  IO.puts("\n๐Ÿ“ฑ Device Information:")
  subscriptions
  |> Enum.take(3)
  |> Enum.each(fn sub ->
    IO.puts("  #{sub["rss_source_id"]}")
    IO.puts("    Device: #{sub["device_name"]} (#{sub["device_id"]})")
  end)
end

Testing Idempotency

Subscribing to the same podcast multiple times should be idempotent:

# Subscribe to a podcast multiple times
test_podcast = "https://example.com/test-feed.xml"
test_id = "test-podcast"

encoded = RSSEncoder.encode_feed_url(test_podcast)

IO.puts("Testing idempotency by subscribing 3 times:")

results = for i <- 1..3 do
  response = Req.post(client, "/subscriptions",
    json: %{
      rss_source_feed: encoded,
      rss_source_id: test_id
    }
  )

  case response do
    {:ok, %{status: status}} ->
      IO.puts("  Attempt #{i}: Status #{status}")
      status
    {:error, _} ->
      IO.puts("  Attempt #{i}: Error")
      :error
  end
end

IO.puts("\nโœ“ Idempotency test complete")
IO.puts("First request should create (200/201), subsequent should return existing (200/409)")

Cleanup

# Unsubscribe from all test subscriptions
test_podcasts = ["test-podcast", "syntax-fm", "the-changelog"]

IO.puts("Cleaning up test subscriptions:")

all_subs = Req.get!(client, "/subscriptions").body

test_podcasts
|> Enum.each(fn podcast_id ->
  sub = Enum.find(all_subs, fn s -> s["rss_source_id"] == podcast_id end)

  case sub do
    nil ->
      IO.puts("  #{podcast_id}: Not subscribed")

    %{"rss_source_feed" => feed} ->
      Req.delete(client, "/subscriptions/#{feed}")
      IO.puts("  #{podcast_id}: Unsubscribed")
  end
end)

IO.puts("\nโœ“ Cleanup complete")

Next Steps

Common Issues

401 Unauthorized

  • Check your JWT token is valid and not expired
  • Ensure the Authorization header is set correctly
  • Verify token format: Bearer

400 Bad Request

  • Verify RSS feed URL is properly base64-encoded
  • Check that rss_source_id is provided
  • Ensure request body is valid JSON

Subscription not appearing

  • Remember: eventual consistency
  • Wait a moment and retry GET /subscriptions
  • Check event store for UserSubscribed events

Cannot unsubscribe

  • Verify the rss_source_feed (base64-encoded URL) is correct
  • Check youโ€™re using DELETE with the encoded feed as ID
  • Ensure youโ€™re actually subscribed first