Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

PicChat: PubSub

reading/pic_chat_pub_sub.livemd

PicChat: PubSub

Mix.install([
  {:jason, "~> 1.4"},
  {:kino, "~> 0.9", override: true},
  {:youtube, github: "brooklinjazz/youtube"},
  {:hidden_cell, github: "brooklinjazz/hidden_cell"}
])

Navigation

Home Report An Issue PicChat: Image UploadPicChat: Pagination

Review Questions

Upon completing this lesson, a student should be able to answer the following questions.

  • What is the PubSub (Publisher/Subscriber pattern) and how does it enable real-time features?
  • How do we subscribe a LiveView process to a topic?
  • How do we broadcast to a topic?
  • How do we handle a broadcasted message in a LiveView process?

Overview

PubSub

PubSub, or “Publish-Subscribe”, is a messaging pattern that allows senders of messages (publishers) to send messages to multiple receivers (subscribers) without explicitly establishing a connection to each individual receiver. This allows for a decoupled communication model, where the publisher and subscriber do not need to be aware of each other or directly connected in order to communicate.

flowchart
PS[PubSub]
P[Publisher]
S1[Subscriber]
S2[Subscriber]
S3[Subscriber]

P --broadcast--> PS
PS --broadcast--> S1
PS --broadcast--> S2
PS --broadcast--> S3

Topics

In a PubSub system, a publisher sends a message to a topic, which acts as a logical channel for the message. Subscribers can then subscribe to that topic, and will receive a copy of the message when it is published. This allows multiple subscribers to receive the same message, and also allows publishers to send messages to multiple topics, which can then be received by multiple subscribers.

flowchart BT
S1[Subscriber]
S2[Subscriber]
S3[Subscriber]
T[Topic]

S1 --subscribe--> T
S2 --subscribe--> T
S3 --subscribe--> T

Phoenix.PubSub

Phoenix provides a built-in PubSub system based on the Elixir process-based actors model, which allows clients to subscribe to topics and receive messages in real-time. Since LiveViews are GenServer processes, each Phoenix LiveView can subscribe to relevant topics and render information in real-time based on published events.

Our Phoenix PubSub service is started in application.ex as part of the application supervision tree.

def start(_type, _args) do
  children = [
    # Start the Ecto repository
    App.Repo,
    # Start the Telemetry supervisor
    AppWeb.Telemetry,
    # Start the PubSub system
    {Phoenix.PubSub, name: App.PubSub},
    # Start the Endpoint (http/https)
    AppWeb.Endpoint
    # Start a worker by calling: App.Worker.start_link(arg)
    # {App.Worker, arg}
  ]

  # See https://hexdocs.pm/elixir/Supervisor.html
  # for other strategies and supported options
  opts = [strategy: :one_for_one, name: App.Supervisor]
  Supervisor.start_link(children, opts)
end

PicChat: Pub Sub

Over the next several lessons, we’re going to build a PicChat application where users can create messages with uploaded pictures. This lesson will focus adding a pub-sub system that enables a real-time feed of messages.

Whenever a LiveView process triggers a relevant event such as adding, creating, or updating a message, we’ll broadcast a message in the pubsub that causes other LiveView processes to update their chat feed.

flowchart
P[Publisher]
PS[(PubSub)]
S1[LiveView Subscriber]
S2[LiveView Subscriber]
S3[LiveView Subscriber]

P --broadcast event to ''messages'' topic--> PS
PS --broadcast event--> S1
PS --broadcast event--> S2
PS --broadcast event--> S3

There are three main steps to implement a simple PubSub system.

  1. Use Endpoint.subscribe/2 which calls Phoenix.PubSuber.subscribe/3 to subscribe the relevant processes to a topic.
  2. Broadcast a message to all subscribers of a topic using a broadcast function such as Endpoint.broadcast/3 or Endpoint.broadcast_from/4 (does not send a message to the caller)
  3. Handle the broadcasted message in the subscriber processes with a handle_info/2 callback.

Subscribe

We display the list of messages on the MessageLive.Index LiveView. We’re going to subscribe every mounted MessageLive.Index LiveView to a "messages" topic.

# Index.ex
def mount(_params, session, socket) do
  if connected?(socket) do
    PicChatWeb.Endpoint.subscribe("messages")
  end

  {:ok, stream(socket, :messages, Chat.list_messages())}
end

Now our subscribed MessageLive.Index LiveView processes will receive any messages broadcast to the "messages" topic.

Broadcast

We want to broadcast messages to our PubSub system whenever an event occurs on the "messages" topic that is relevant to providing real-time updates. We’ll broadcast a message anytime we create, update, or delete a Message record.

Save Message

Broadcast a "new" or "edit" event on the "messages" topic whenever we create or update a new message.

To save time so that the broadcasted message doesn’t have to go through the PubSub system, we’ve opted to use broadcast_from/4 instead of broadcast/3 and let the sender notify the parent LiveView of the saved message directly.

# Form_component.ex
defp save_message(socket, :edit, message_params) do
  case Chat.update_message(socket.assigns.message, message_params) do
    {:ok, message} ->
      notify_parent({:edit, message})
      PicChatWeb.Endpoint.broadcast_from(self(), "messages", "edit", message)

      {:noreply,
        socket
        |> put_flash(:info, "Message updated successfully")
        |> push_patch(to: socket.assigns.patch)}

    {:error, %Ecto.Changeset{} = changeset} ->
      {:noreply, assign_form(socket, changeset)}
  end
end

defp save_message(socket, :new, message_params) do
  case Chat.create_message(message_params) do
    {:ok, message} ->
      notify_parent({:new, message})
      PicChatWeb.Endpoint.broadcast_from(self(), "messages", "new", message)

      {:noreply,
        socket
        |> put_flash(:info, "Message created successfully")
        |> push_patch(to: socket.assigns.patch)}

    {:error, %Ecto.Changeset{} = changeset} ->
      {:noreply, assign_form(socket, changeset)}
  end
end

Delete

Broadcast a "delete" event on the "messages" topic when we create or update a new message.

# Index.ex
def handle_event("delete", %{"id" => id}, socket) do
  message = Chat.get_message!(id)

  if message.user_id == socket.assigns.current_user.id do
    {:ok, _} = Chat.delete_message(message)
    PicChatWeb.Endpoint.broadcast_from(self(), "messages", "delete", message)
    {:noreply, stream_delete(socket, :messages, message)}
  else
    {:noreply,
      Phoenix.LiveView.put_flash(
        socket,
        :error,
        "You are not authorized to delete this message."
      )}
  end
end

Handle Received Event

The handle_info/2 callback in subscribed LiveView processes will receive a Phoenix.Socket.Broadcast broadcast struct.

%Phoenix.Socket.Broadcast{
  topic: "messages",
  event: "new",
  payload: %PicChat.Chat.Message{
    __meta__: #Ecto.Schema.Metadata<:loaded, "messages">,
    id: 4,
    content: "some content",
    picture: nil,
    user_id: 1,
    user: #Ecto.Association.NotLoaded,
    inserted_at: ~N[2023-05-28 21:26:18],
    updated_at: ~N[2023-05-28 21:26:18]
  }
}

We can pattern match on this struct to create event handlers. Add handle_info/2 callback functions for creating, updating, and deleting a Message in the stream of messages.

# Index.ex
def handle_info(%Phoenix.Socket.Broadcast{topic: "messages", event: "new", payload: message}, socket) do
  {:noreply, stream_insert(socket, :messages, message, at: 0)}
end

def handle_info(%Phoenix.Socket.Broadcast{topic: "messages", event: "edit", payload: message}, socket) do
  {:noreply, stream_insert(socket, :messages, message)}
end

def handle_info(%Phoenix.Socket.Broadcast{topic: "messages", event: "delete", payload: message}, socket) do
  {:noreply, stream_delete(socket, :messages, message)}
end

Testing PubSub

We can test PubSub interactions by mounting multiple LiveViews and triggering events.

Here are some tests we can add to message_live_test.ex to ensure the create, update, and delete PubSub functionality works as expected.

describe "PubSub" do
  test "creating a message updates subscribers", %{conn: conn} do
    user = user_fixture()
    conn = log_in_user(conn, user)
    {:ok, subscriber_live, _html} = live(conn, ~p"/messages")
    {:ok, publisher_live, _html} = live(conn, ~p"/messages/new")

    assert publisher_live
            |> form("#message-form", message: @create_attrs)
            |> render_submit()

    assert render(subscriber_live) =~ "some content"
  end

  test "updating a message updates subscribers", %{conn: conn} do
    user = user_fixture()
    conn = log_in_user(conn, user)
    message = message_fixture(user_id: user.id)
    {:ok, subscriber_live, _html} = live(conn, ~p"/messages")
    {:ok, publisher_live, _html} = live(conn, ~p"/messages/#{message}/edit")

    assert publisher_live
            |> form("#message-form", message: @update_attrs)
            |> render_submit()

    assert render(subscriber_live) =~ "some updated content"
  end

  test "deleting a message updates subscribers", %{conn: conn} do
    user = user_fixture()
    conn = log_in_user(conn, user)
    message = message_fixture(user_id: user.id)
    {:ok, subscriber_live, _html} = live(conn, ~p"/messages")
    {:ok, publisher_live, _html} = live(conn, ~p"/messages/#{message}/edit")

    assert publisher_live |> element("#messages-#{message.id} a", "Delete") |> render_click()

    refute render(subscriber_live) =~ "some content"
  end
end

Further Reading

Consider the following resource(s) to deepen your understanding of the topic.

Commit Your Progress

DockYard Academy now recommends you use the latest Release rather than forking or cloning our repository.

Run git status to ensure there are no undesirable changes. Then run the following in your command line from the curriculum folder to commit your progress.

$ git add .
$ git commit -m "finish PicChat: PubSub reading"
$ git push

We’re proud to offer our open-source curriculum free of charge for anyone to learn from at their own pace.

We also offer a paid course where you can learn from an instructor alongside a cohort of your peers. We will accept applications for the June-August 2023 cohort soon.

Navigation

Home Report An Issue PicChat: Image UploadPicChat: Pagination