Subscriptions API
Mix.install([
{:req, "~> 0.4"},
{:jason, "~> 1.4"}
])
Introduction
The Subscriptions API allows you to manage podcast subscriptions for authenticated users. This Livebook demonstrates:
- Subscribe to podcasts
- List all subscriptions
- Unsubscribe from podcasts
- Understanding the CQRS/Event Sourcing flow
Setup
# API Configuration
base_url = "http://localhost:4000"
api_base = "#{base_url}/api/v1"
# Your JWT token (get this from the authentication.livemd)
token = System.get_env("BALADOS_JWT") || "paste_your_jwt_token_here"
# Create authenticated client
client = Req.new(
base_url: api_base,
headers: [{"authorization", "Bearer #{token}"}]
)
IO.puts("โ Client configured")
Understanding RSS Source Encoding
Balados Sync uses base64-encoded RSS feed URLs to uniquely identify podcasts:
defmodule RSSEncoder do
def encode_feed_url(url) do
Base.encode64(url)
end
def decode_feed_url(encoded) do
Base.decode64!(encoded)
end
def encode_episode(guid, enclosure_url) do
Base.encode64("#{guid},#{enclosure_url}")
end
end
# Example podcast feed
example_feed = "https://feeds.example.com/my-podcast-feed.xml"
encoded_feed = RSSEncoder.encode_feed_url(example_feed)
IO.puts("Original URL: #{example_feed}")
IO.puts("Encoded (rss_source_feed): #{encoded_feed}")
IO.puts("Decoded: #{RSSEncoder.decode_feed_url(encoded_feed)}")
List All Subscriptions
# GET /api/v1/subscriptions
response = Req.get!(client, "/subscriptions")
IO.inspect(response.status, label: "Status")
IO.inspect(response.body, label: "Subscriptions")
# Parse and display nicely
subscriptions = response.body
IO.puts("\n๐ Your Subscriptions (#{length(subscriptions)}):")
Enum.each(subscriptions, fn sub ->
feed_url = RSSEncoder.decode_feed_url(sub["rss_source_feed"])
IO.puts(" โข #{sub["rss_source_id"]}")
IO.puts(" Feed: #{feed_url}")
IO.puts(" Subscribed: #{sub["subscribed_at"]}")
IO.puts("")
end)
Subscribe to a Podcast
# Example podcast to subscribe to
podcast_feed = "https://feeds.transistor.fm/changelog-podcasts"
podcast_id = "changelog-podcasts"
# Encode the feed URL
rss_source_feed = RSSEncoder.encode_feed_url(podcast_feed)
# POST /api/v1/subscriptions
subscription_request = %{
rss_source_feed: rss_source_feed,
rss_source_id: podcast_id
}
subscribe_response = Req.post!(client, "/subscriptions",
json: subscription_request
)
IO.inspect(subscribe_response.status, label: "Status")
IO.inspect(subscribe_response.body, label: "Subscription Created")
case subscribe_response.status do
200 -> IO.puts("โ Successfully subscribed to #{podcast_id}!")
201 -> IO.puts("โ Successfully subscribed to #{podcast_id}!")
409 -> IO.puts("โน Already subscribed to #{podcast_id}")
_ -> IO.puts("โ Failed to subscribe")
end
Subscribe to Multiple Podcasts
# List of podcasts to subscribe to
podcasts = [
%{url: "https://feeds.megaphone.fm/syntax", id: "syntax-fm"},
%{url: "https://feeds.simplecast.com/54nAGcIl", id: "the-changelog"},
%{url: "https://elixiroutlaws.com/feed.xml", id: "elixir-outlaws"}
]
# Subscribe to each
results = Enum.map(podcasts, fn podcast ->
encoded_feed = RSSEncoder.encode_feed_url(podcast.url)
response = Req.post(client, "/subscriptions",
json: %{
rss_source_feed: encoded_feed,
rss_source_id: podcast.id
}
)
case response do
{:ok, %{status: status}} when status in [200, 201] ->
%{podcast: podcast.id, status: :subscribed}
{:ok, %{status: 409}} ->
%{podcast: podcast.id, status: :already_subscribed}
{:ok, %{status: status}} ->
%{podcast: podcast.id, status: {:error, status}}
{:error, reason} ->
%{podcast: podcast.id, status: {:error, reason}}
end
end)
IO.inspect(results, label: "Subscription Results")
# Summary
successful = Enum.count(results, fn r -> r.status == :subscribed end)
already = Enum.count(results, fn r -> r.status == :already_subscribed end)
failed = Enum.count(results, fn r -> match?({:error, _}, r.status) end)
IO.puts("\n๐ Summary:")
IO.puts(" โ New subscriptions: #{successful}")
IO.puts(" โน Already subscribed: #{already}")
IO.puts(" โ Failed: #{failed}")
Get a Specific Subscription
# Find a subscription by podcast ID
podcast_id_to_find = "syntax-fm"
all_subs = Req.get!(client, "/subscriptions").body
subscription = Enum.find(all_subs, fn sub ->
sub["rss_source_id"] == podcast_id_to_find
end)
case subscription do
nil ->
IO.puts("โ Not subscribed to #{podcast_id_to_find}")
sub ->
IO.puts("โ Found subscription:")
IO.inspect(sub, pretty: true)
# Decode the feed URL
feed_url = RSSEncoder.decode_feed_url(sub["rss_source_feed"])
IO.puts("\nFeed URL: #{feed_url}")
end
Unsubscribe from a Podcast
# Podcast to unsubscribe from
podcast_to_remove = "elixir-outlaws"
# First, find the subscription
all_subscriptions = Req.get!(client, "/subscriptions").body
subscription_to_remove = Enum.find(all_subscriptions, fn sub ->
sub["rss_source_id"] == podcast_to_remove
end)
case subscription_to_remove do
nil ->
IO.puts("โน Not subscribed to #{podcast_to_remove}")
sub ->
# Get the encoded feed (used as ID)
rss_source_feed = sub["rss_source_feed"]
# DELETE /api/v1/subscriptions/:rss_source_feed
delete_response = Req.delete!(client, "/subscriptions/#{rss_source_feed}")
IO.inspect(delete_response.status, label: "Status")
case delete_response.status do
204 -> IO.puts("โ Successfully unsubscribed from #{podcast_to_remove}")
200 -> IO.puts("โ Successfully unsubscribed from #{podcast_to_remove}")
404 -> IO.puts("โ Subscription not found")
_ -> IO.puts("โ Failed to unsubscribe")
end
end
Check Subscription Status
# Check if subscribed to a specific podcast
defmodule SubscriptionChecker do
def is_subscribed?(client, podcast_id) do
all_subs = Req.get!(client, "/subscriptions").body
Enum.any?(all_subs, fn sub -> sub["rss_source_id"] == podcast_id end)
end
def subscription_count(client) do
Req.get!(client, "/subscriptions").body |> length()
end
end
podcasts_to_check = ["syntax-fm", "the-changelog", "some-random-podcast"]
IO.puts("๐ Subscription Status:")
Enum.each(podcasts_to_check, fn podcast_id ->
status = if SubscriptionChecker.is_subscribed?(client, podcast_id) do
"โ Subscribed"
else
"โ Not subscribed"
end
IO.puts(" #{podcast_id}: #{status}")
end)
IO.puts("\nTotal subscriptions: #{SubscriptionChecker.subscription_count(client)}")
Understanding Event Sourcing
When you subscribe to a podcast, hereโs what happens behind the scenes:
IO.puts("""
CQRS/Event Sourcing Flow:
1. POST /api/v1/subscriptions
โ
2. Controller creates Subscribe command
โ
3. Command dispatched to User aggregate
โ
4. Aggregate validates and emits UserSubscribed event
โ
5. Event persisted to EventStore (immutable)
โ
6. SubscriptionProjector handles event
โ
7. Read model updated in PostgreSQL
โ
8. Controller queries projection and returns response
This means:
โข Every subscription is an immutable event
โข Events can be replayed to rebuild state
โข Read models are eventually consistent
โข Audit trail is preserved forever
""")
Advanced: Subscription with Custom Device Info
The JWT token contains device information, but you can track which device performed the subscription:
# The device_id and device_name from your JWT are automatically used
# You can generate different tokens for different devices
IO.puts("""
Device Tracking:
Each subscription is associated with the device that created it.
This is determined by the device_id and device_name in your JWT token.
To track subscriptions per device:
1. Generate separate JWT tokens for each device
2. Use the appropriate token when subscribing
3. Query subscriptions to see which device subscribed
""")
# Display device info from current subscriptions
subscriptions = Req.get!(client, "/subscriptions").body
if length(subscriptions) > 0 do
IO.puts("\n๐ฑ Device Information:")
subscriptions
|> Enum.take(3)
|> Enum.each(fn sub ->
IO.puts(" #{sub["rss_source_id"]}")
IO.puts(" Device: #{sub["device_name"]} (#{sub["device_id"]})")
end)
end
Testing Idempotency
Subscribing to the same podcast multiple times should be idempotent:
# Subscribe to a podcast multiple times
test_podcast = "https://example.com/test-feed.xml"
test_id = "test-podcast"
encoded = RSSEncoder.encode_feed_url(test_podcast)
IO.puts("Testing idempotency by subscribing 3 times:")
results = for i <- 1..3 do
response = Req.post(client, "/subscriptions",
json: %{
rss_source_feed: encoded,
rss_source_id: test_id
}
)
case response do
{:ok, %{status: status}} ->
IO.puts(" Attempt #{i}: Status #{status}")
status
{:error, _} ->
IO.puts(" Attempt #{i}: Error")
:error
end
end
IO.puts("\nโ Idempotency test complete")
IO.puts("First request should create (200/201), subsequent should return existing (200/409)")
Cleanup
# Unsubscribe from all test subscriptions
test_podcasts = ["test-podcast", "syntax-fm", "the-changelog"]
IO.puts("Cleaning up test subscriptions:")
all_subs = Req.get!(client, "/subscriptions").body
test_podcasts
|> Enum.each(fn podcast_id ->
sub = Enum.find(all_subs, fn s -> s["rss_source_id"] == podcast_id end)
case sub do
nil ->
IO.puts(" #{podcast_id}: Not subscribed")
%{"rss_source_feed" => feed} ->
Req.delete(client, "/subscriptions/#{feed}")
IO.puts(" #{podcast_id}: Unsubscribed")
end
end)
IO.puts("\nโ Cleanup complete")
Next Steps
- Play Status API - Track listening progress
- Episodes API - Query episode data
- Privacy Settings - Control data visibility
- Architecture Guide - Understand CQRS/ES
Common Issues
401 Unauthorized
- Check your JWT token is valid and not expired
- Ensure the Authorization header is set correctly
-
Verify token format:
Bearer
400 Bad Request
- Verify RSS feed URL is properly base64-encoded
- Check that rss_source_id is provided
- Ensure request body is valid JSON
Subscription not appearing
- Remember: eventual consistency
- Wait a moment and retry GET /subscriptions
- Check event store for UserSubscribed events
Cannot unsubscribe
- Verify the rss_source_feed (base64-encoded URL) is correct
- Check youโre using DELETE with the encoded feed as ID
- Ensure youโre actually subscribed first