Powered by AppSignal & Oban Pro

PicChat: PubSub

reading/pic_chat_pub_sub.livemd

PicChat: PubSub

Mix.install([
  {:jason, "~> 1.4"},
  {:kino, "~> 0.9", override: true},
  {:youtube, github: "brooklinjazz/youtube"},
  {:hidden_cell, github: "brooklinjazz/hidden_cell"}
])

Navigation

Home Report An Issue PicChat: Image UploadPicChat: Infinite Scroll

Review Questions

Upon completing this lesson, a student should be able to answer the following questions.

  • What is the PubSub (Publisher/Subscriber pattern) and how does it enable real-time features?
  • How do we subscribe a LiveView process to a topic?
  • How do we broadcast to a topic?
  • How do we handle a broadcasted message in a LiveView process?

Overview

PubSub

PubSub, or “Publish-Subscribe”, is a messaging pattern that allows senders of messages (publishers) to send messages to multiple receivers (subscribers) without explicitly establishing a connection to each individual receiver. This allows for a decoupled communication model, where the publisher and subscriber do not need to be aware of each other or directly connected in order to communicate.

flowchart
PS[PubSub]
P[Publisher]
S1[Subscriber]
S2[Subscriber]
S3[Subscriber]

P --broadcast--> PS
PS --broadcast--> S1
PS --broadcast--> S2
PS --broadcast--> S3

Topics

In a PubSub system, a publisher sends a message to a topic, which acts as a logical channel for the message. Subscribers can then subscribe to that topic, and will receive a copy of the message when it is published. This allows multiple subscribers to receive the same message, and also allows publishers to send messages to multiple topics, which can then be received by multiple subscribers.

flowchart BT
S1[Subscriber]
S2[Subscriber]
S3[Subscriber]
T[Topic]

S1 --subscribe--> T
S2 --subscribe--> T
S3 --subscribe--> T

Phoenix.PubSub

Phoenix provides a built-in PubSub system based on the Elixir process-based actors model, which allows clients to subscribe to topics and receive messages in real-time. Since LiveViews are GenServer processes, each Phoenix LiveView can subscribe to relevant topics and render information in real-time based on published events.

Our Phoenix PubSub service is started in application.ex as part of the application supervision tree.

def start(_type, _args) do
  children = [
    # Start the Ecto repository
    App.Repo,
    # Start the Telemetry supervisor
    AppWeb.Telemetry,
    # Start the PubSub system
    {Phoenix.PubSub, name: App.PubSub},
    # Start the Endpoint (http/https)
    AppWeb.Endpoint
    # Start a worker by calling: App.Worker.start_link(arg)
    # {App.Worker, arg}
  ]

  # See https://hexdocs.pm/elixir/Supervisor.html
  # for other strategies and supported options
  opts = [strategy: :one_for_one, name: App.Supervisor]
  Supervisor.start_link(children, opts)
end

Phoenix.Endpoint

The Phoenix.Endpoint module is the boundary for your Phoenix web application where all of your HTTP requests start.

Pheonix.Endpoint contains the API functions such as subscribe/2, broadcast/3, and broadcast_from/4 for working with our Phoenix PubSub service.

subscribe/2 subscribes a process to a particular topic. We usually call this function in the mount/2 function of a LiveView. We use connected?/1 to avoid calling the subscribe/2 function twice.

  @impl true
  def mount(_params, _session, socket) do
    if connected?(socket) do
      AppWeb.Endpoint.subscribe("topic_name")
    end

    {:ok, socket}
  end

broadcast/3 sends a message to all subscribers of a particular topic. It takes a topic, and event name, and a payload of data to send with the message

payload = "payload" # some elixir term
AppWeb.Endpoint.broadcast("topic_name", "event_name", payload)

broadcast_from/4 works similarly to broadcast/3 except it doesn’t send the message to the broadcaster, even if it’s also subscribed to the topic.

payload = "payload" # some elixir term
AppWeb.Endpoint.broadcast_from(self(), "topic_name", "event_name", payload)

These functions often wrap a Phoenix.PubSub function and automatically pass the name of our PubSub service App.PubSub as well as provide some other conveniences such as an improved API.

payload = "payload" # some elixir term
Phoenix.PubSub.broadcast(App.PubSub, "topic_name", {:event_name, payload})

Follow Along: PicChat PubSub

To learn more about PubSub, we’re going to add real-time features to our existing PicChat application. Whenever a user creates, updates, or deletes a message, we’ll update every LiveView with the new list of messages.

flowchart
P[Publisher]
PS[(PubSub)]
S1[LiveView Subscriber]
S2[LiveView Subscriber]
S3[LiveView Subscriber]

P --broadcast event to ''messages'' topic--> PS
PS --broadcast event--> S1
PS --broadcast event--> S2
PS --broadcast event--> S3

You can see the completed PicChat application on the pubsub branch for the sake of reference.

Observer

Elixir comes with an :observer program that makes it easier to observe processes in our Application.

Start your PicChat phoenix server in the IEx shell.

$ iex -S mix phx.server

Then start the :observer.

iex> :observer.start()

You should see a Graphical User Interface (GUI) appear. Open up the Applications panel to view the supervision tree of your application processes.

For the sake of learning how to use the observer, let’s find a MessageLive.Index process in our supervision tree.

First, inspect the pid of the current MessageLive.Index process in the MessageLive.mount/3 callback.

def mount(_params, _session, socket) do
  IO.inspect(self(), label: "LiveView Process ID")

  # ...
end

Visit http://localhost:4000 to mount a MessageLive.Index process. Observe the PID in your terminal. The PID will be unique.

LiveView Process ID: #PID<0.111.0>

Each LiveView runs in a separate process. Open multiple tabs on http://localhost:4000 and you’ll notice each has a different PID.

LiveView Process ID: #PID<0.9424.0>
LiveView Process ID: #PID<0.9479.0>
LiveView Process ID: #PID<0.9530.0>

We can use the Observer to manually kill processes. Find the PID of a LiveView process in your Observer. Click on File then Kill process. Observe the LiveView as it crashes.

We can also use the Observer to manually send processes messages. Define a handle_info/2 callback in MessageLive.Index.

def handle_info(message, socket) do
  IO.inspect(message, label: "Received Message")
  {:noreply, socket}
end

Select the process in the observer, then click File -> Send Msg. Enter any Elixir term such as "hello" to send the LiveView process a message.

Observe the message printed to the terminal.

Received Message: 'hello'

Remove the handle_info/2 callback and any IO.inspect calls as we are finished with the demonstration.

Subscribe

We display the list of messages on the MessageLive.Index LiveView. We’re going to subscribe every mounted MessageLive.Index LiveView to a "messages" topic.

def mount(_params, _session, socket) do
  if connected?(socket) do
    PicChatWeb.Endpoint.subscribe("messages")
  end

  {:ok,
    socket
    |> assign(:messages, list_messages())
    |> allow_upload(:picture, accept: ~w(.jpg .jpeg .png), max_entries: 1)}
end

That’s it. Our subscribed MessageLive.Index LiveView processes will receive any messages broadcast to the "messages topic.

Publish

We want to broadcast an event on the "messages" topic that will tell our MessageLive.Index processes to update their list of messages.

Call broadcast/4 in the save_message/3 function in form_component.ex to broadcast a message to all subscribed processes.

defp save_message(socket, :new, message_params) do
  case Chat.create_message(message_params) do
    {:ok, message} ->
      # broadcast the "create_message" event to the "messages" topic
      PicChatWeb.Endpoint.broadcast("messages", "create_message", message)

      {:noreply,
        socket
        |> put_flash(:info, "Message created successfully")
        |> push_redirect(to: socket.assigns.return_to)}

    {:error, %Ecto.Changeset{} = changeset} ->
      {:noreply, assign(socket, changeset: changeset)}
  end
end

Handle Received Event

Processes can handle the received message using the handle_info/2 callback. Let’s start by defining a generic handler to inspect the message sent to the MessageLive.Index LiveView.

@impl true
def handle_info(message, socket) do
  IO.inspect(message, label: "Received Message")
  {:noreply, socket}
end

Visit http:localhost/4000/new and create a new message in the chat application. See the printed message in the terminal.

%Phoenix.Socket.Broadcast{
  topic: "messages",
  event: "create_message",
  payload: %PicChat.Chat.Message{
    __meta__: #Ecto.Schema.Metadata<:loaded, "messages">,
    id: 1,
    content: "Example Content",
    from: "Example From",
    picture: nil,
    inserted_at: ~N[2023-01-08 08:14:52],
    updated_at: ~N[2023-01-08 08:14:52]
  }
}

We’re given a Phoenix.Socket.Broadcast struct. This includes the topic, the event, and the payload. Topics often have multiple events. This allows us to handle different events separately without having to subscribe to every different type of event.

For example, we might have a "create_message", "delete_message", and "update_message event all under the "messages" topic.

To handle this event, modify the handle_info/2 function in MessageLive.Index to get the full list of messages and assign it to the socket to render an updated list.

def handle_info(%{topic: "messages"}, socket) do
  {:noreply, assign(socket, :messages, list_messages())}
end

Open http://localhost:4000 in two different browser tabs. Create a message and notice that it appears in real-time on both tabs.

Your Turn: Broadcast “update_message” And “delete_message” Events

You’re going to broadcast a message every time you update or delete a chat message. Your list of messages displayed on http://localhost:4000 should update in real time.

See PicChat on the pubsub branch for reference.

Example Solution

Broadcast after successfully editing a message in form_component.ex.

defp save_message(socket, :edit, message_params) do
  case Chat.update_message(socket.assigns.message, message_params) do
    {:ok, message} ->
      PicChatWeb.Endpoint.broadcast("messages", "update_message", message)

      {:noreply,
        socket
        |> put_flash(:info, "Message updated successfully")
        |> push_redirect(to: socket.assigns.return_to)}

    {:error, %Ecto.Changeset{} = changeset} ->
      {:noreply, assign(socket, :changeset, changeset)}
  end
end

Broadcast when deleting a message with the "delete" event in index.ex. Either you can omit the assigning of :messages to the socket (this is our preferred solution to avoid issues in the next infinite-scroll lesson)

@impl true
def handle_event("delete", %{"id" => id}, socket) do
  message = Chat.get_message!(id)
  {:ok, _} = Chat.delete_message(message)

  PicChatWeb.Endpoint.broadcast("messages", "delete_message", id)

  # assigning :messages is no longer necessary, as the `"messages"` event handler retrieves the new list of messages.
  {:noreply, socket}
end

Alternatively, you could use broadcast_from/4, then handle the current LiveView individually.

def handle_event("delete", %{"id" => id}, socket) do
  message = Chat.get_message!(id)
  {:ok, _} = Chat.delete_message(message)

  PicChatWeb.Endpoint.broadcast_from(self(), "messages", "delete_message", id)

  {:noreply, assign(socket, :messages, list_messages())}
end

Further Reading

Consider the following resource(s) to deepen your understanding of the topic.

Commit Your Progress

DockYard Academy now recommends you use the latest Release rather than forking or cloning our repository.

Run git status to ensure there are no undesirable changes. Then run the following in your command line from the curriculum folder to commit your progress.

$ git add .
$ git commit -m "finish PicChat: PubSub reading"
$ git push

We’re proud to offer our open-source curriculum free of charge for anyone to learn from at their own pace.

We also offer a paid course where you can learn from an instructor alongside a cohort of your peers. We will accept applications for the June-August 2023 cohort soon.

Navigation

Home Report An Issue PicChat: Image UploadPicChat: Infinite Scroll