PicChat: PubSub
Mix.install([
{:jason, "~> 1.4"},
{:kino, "~> 0.9", override: true},
{:youtube, github: "brooklinjazz/youtube"},
{:hidden_cell, github: "brooklinjazz/hidden_cell"}
])
Navigation
Home Report An Issue PicChat: Image UploadPicChat: Infinite ScrollReview Questions
Upon completing this lesson, a student should be able to answer the following questions.
- What is the PubSub (Publisher/Subscriber pattern) and how does it enable real-time features?
- How do we subscribe a LiveView process to a topic?
- How do we broadcast to a topic?
- How do we handle a broadcasted message in a LiveView process?
Overview
PubSub
PubSub, or “Publish-Subscribe”, is a messaging pattern that allows senders of messages (publishers) to send messages to multiple receivers (subscribers) without explicitly establishing a connection to each individual receiver. This allows for a decoupled communication model, where the publisher and subscriber do not need to be aware of each other or directly connected in order to communicate.
flowchart
PS[PubSub]
P[Publisher]
S1[Subscriber]
S2[Subscriber]
S3[Subscriber]
P --broadcast--> PS
PS --broadcast--> S1
PS --broadcast--> S2
PS --broadcast--> S3
Topics
In a PubSub system, a publisher sends a message to a topic, which acts as a logical channel for the message. Subscribers can then subscribe to that topic, and will receive a copy of the message when it is published. This allows multiple subscribers to receive the same message, and also allows publishers to send messages to multiple topics, which can then be received by multiple subscribers.
flowchart BT
S1[Subscriber]
S2[Subscriber]
S3[Subscriber]
T[Topic]
S1 --subscribe--> T
S2 --subscribe--> T
S3 --subscribe--> T
Phoenix.PubSub
Phoenix provides a built-in PubSub system based on the Elixir process-based actors model, which allows clients to subscribe to topics and receive messages in real-time. Since LiveViews are GenServer processes, each Phoenix LiveView can subscribe to relevant topics and render information in real-time based on published events.
Our Phoenix PubSub service is started in application.ex as part of the application supervision tree.
def start(_type, _args) do
children = [
# Start the Ecto repository
App.Repo,
# Start the Telemetry supervisor
AppWeb.Telemetry,
# Start the PubSub system
{Phoenix.PubSub, name: App.PubSub},
# Start the Endpoint (http/https)
AppWeb.Endpoint
# Start a worker by calling: App.Worker.start_link(arg)
# {App.Worker, arg}
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: App.Supervisor]
Supervisor.start_link(children, opts)
end
Phoenix.Endpoint
The Phoenix.Endpoint module is the boundary for your Phoenix web application where all of your HTTP requests start.
Pheonix.Endpoint contains the API functions such as subscribe/2, broadcast/3, and broadcast_from/4 for working with our Phoenix PubSub service.
subscribe/2 subscribes a process to a particular topic. We usually call this function in the mount/2 function of a LiveView. We use connected?/1 to avoid calling the subscribe/2 function twice.
@impl true
def mount(_params, _session, socket) do
if connected?(socket) do
AppWeb.Endpoint.subscribe("topic_name")
end
{:ok, socket}
end
broadcast/3 sends a message to all subscribers of a particular topic. It takes a topic, and event name, and a payload of data to send with the message
payload = "payload" # some elixir term
AppWeb.Endpoint.broadcast("topic_name", "event_name", payload)
broadcast_from/4 works similarly to broadcast/3 except it doesn’t send the message to the broadcaster, even if it’s also subscribed to the topic.
payload = "payload" # some elixir term
AppWeb.Endpoint.broadcast_from(self(), "topic_name", "event_name", payload)
These functions often wrap a Phoenix.PubSub function and automatically pass the name of our PubSub service App.PubSub as well as provide some other conveniences such as an improved API.
payload = "payload" # some elixir term
Phoenix.PubSub.broadcast(App.PubSub, "topic_name", {:event_name, payload})
Follow Along: PicChat PubSub
To learn more about PubSub, we’re going to add real-time features to our existing PicChat application. Whenever a user creates, updates, or deletes a message, we’ll update every LiveView with the new list of messages.
flowchart
P[Publisher]
PS[(PubSub)]
S1[LiveView Subscriber]
S2[LiveView Subscriber]
S3[LiveView Subscriber]
P --broadcast event to ''messages'' topic--> PS
PS --broadcast event--> S1
PS --broadcast event--> S2
PS --broadcast event--> S3
You can see the completed PicChat application on the pubsub branch for the sake of reference.
Observer
Elixir comes with an :observer program that makes it easier to observe processes in our Application.
Start your PicChat phoenix server in the IEx shell.
$ iex -S mix phx.server
Then start the :observer.
iex> :observer.start()
You should see a Graphical User Interface (GUI) appear. Open up the Applications panel to view the supervision tree of your application processes.
For the sake of learning how to use the observer, let’s find a MessageLive.Index process in our supervision tree.
First, inspect the pid of the current MessageLive.Index process in the MessageLive.mount/3 callback.
def mount(_params, _session, socket) do
IO.inspect(self(), label: "LiveView Process ID")
# ...
end
Visit http://localhost:4000 to mount a MessageLive.Index process. Observe the PID in your terminal. The PID will be unique.
LiveView Process ID: #PID<0.111.0>
Each LiveView runs in a separate process. Open multiple tabs on http://localhost:4000 and you’ll notice each has a different PID.
LiveView Process ID: #PID<0.9424.0>
LiveView Process ID: #PID<0.9479.0>
LiveView Process ID: #PID<0.9530.0>
We can use the Observer to manually kill processes. Find the PID of a LiveView process in your Observer. Click on File then Kill process. Observe the LiveView as it crashes.
We can also use the Observer to manually send processes messages. Define a handle_info/2 callback in MessageLive.Index.
def handle_info(message, socket) do
IO.inspect(message, label: "Received Message")
{:noreply, socket}
end
Select the process in the observer, then click File -> Send Msg. Enter any Elixir term such as "hello" to send the LiveView process a message.
Observe the message printed to the terminal.
Received Message: 'hello'
Remove the handle_info/2 callback and any IO.inspect calls as we are finished with the demonstration.
Subscribe
We display the list of messages on the MessageLive.Index LiveView. We’re going to subscribe every mounted MessageLive.Index LiveView to a "messages" topic.
def mount(_params, _session, socket) do
if connected?(socket) do
PicChatWeb.Endpoint.subscribe("messages")
end
{:ok,
socket
|> assign(:messages, list_messages())
|> allow_upload(:picture, accept: ~w(.jpg .jpeg .png), max_entries: 1)}
end
That’s it. Our subscribed MessageLive.Index LiveView processes will receive any messages broadcast to the "messages topic.
Publish
We want to broadcast an event on the "messages" topic that will tell our MessageLive.Index processes to update their list of messages.
Call broadcast/4 in the save_message/3 function in form_component.ex to broadcast a message to all subscribed processes.
defp save_message(socket, :new, message_params) do
case Chat.create_message(message_params) do
{:ok, message} ->
# broadcast the "create_message" event to the "messages" topic
PicChatWeb.Endpoint.broadcast("messages", "create_message", message)
{:noreply,
socket
|> put_flash(:info, "Message created successfully")
|> push_redirect(to: socket.assigns.return_to)}
{:error, %Ecto.Changeset{} = changeset} ->
{:noreply, assign(socket, changeset: changeset)}
end
end
Handle Received Event
Processes can handle the received message using the handle_info/2 callback. Let’s start by defining a generic handler to inspect the message sent to the MessageLive.Index LiveView.
@impl true
def handle_info(message, socket) do
IO.inspect(message, label: "Received Message")
{:noreply, socket}
end
Visit http:localhost/4000/new and create a new message in the chat application. See the printed message in the terminal.
%Phoenix.Socket.Broadcast{
topic: "messages",
event: "create_message",
payload: %PicChat.Chat.Message{
__meta__: #Ecto.Schema.Metadata<:loaded, "messages">,
id: 1,
content: "Example Content",
from: "Example From",
picture: nil,
inserted_at: ~N[2023-01-08 08:14:52],
updated_at: ~N[2023-01-08 08:14:52]
}
}
We’re given a Phoenix.Socket.Broadcast struct. This includes the topic, the event, and the payload. Topics often have multiple events. This allows us to handle different events separately without having to subscribe to every different type of event.
For example, we might have a "create_message", "delete_message", and "update_message event all under the "messages" topic.
To handle this event, modify the handle_info/2 function in MessageLive.Index to get the full list of messages and assign it to the socket to render an updated list.
def handle_info(%{topic: "messages"}, socket) do
{:noreply, assign(socket, :messages, list_messages())}
end
Open http://localhost:4000 in two different browser tabs. Create a message and notice that it appears in real-time on both tabs.
Your Turn: Broadcast “update_message” And “delete_message” Events
You’re going to broadcast a message every time you update or delete a chat message. Your list of messages displayed on http://localhost:4000 should update in real time.
See PicChat on the pubsub branch for reference.
Example Solution
Broadcast after successfully editing a message in form_component.ex.
defp save_message(socket, :edit, message_params) do
case Chat.update_message(socket.assigns.message, message_params) do
{:ok, message} ->
PicChatWeb.Endpoint.broadcast("messages", "update_message", message)
{:noreply,
socket
|> put_flash(:info, "Message updated successfully")
|> push_redirect(to: socket.assigns.return_to)}
{:error, %Ecto.Changeset{} = changeset} ->
{:noreply, assign(socket, :changeset, changeset)}
end
end
Broadcast when deleting a message with the "delete" event in index.ex.
Either you can omit the assigning of :messages to the socket (this is our preferred solution to avoid issues in the next infinite-scroll lesson)
@impl true
def handle_event("delete", %{"id" => id}, socket) do
message = Chat.get_message!(id)
{:ok, _} = Chat.delete_message(message)
PicChatWeb.Endpoint.broadcast("messages", "delete_message", id)
# assigning :messages is no longer necessary, as the `"messages"` event handler retrieves the new list of messages.
{:noreply, socket}
end
Alternatively, you could use broadcast_from/4, then handle the current LiveView individually.
def handle_event("delete", %{"id" => id}, socket) do
message = Chat.get_message!(id)
{:ok, _} = Chat.delete_message(message)
PicChatWeb.Endpoint.broadcast_from(self(), "messages", "delete_message", id)
{:noreply, assign(socket, :messages, list_messages())}
end
Further Reading
Consider the following resource(s) to deepen your understanding of the topic.
- Elixir Schools: LiveView with PubSub
- Elixir Schools: LiveView with Channels
- HexDocs: Phoenix Channels
- HexDocs: Phoenix PubSub
- HexDocs: Phoenix Endpoint
Commit Your Progress
DockYard Academy now recommends you use the latest Release rather than forking or cloning our repository.
Run git status to ensure there are no undesirable changes.
Then run the following in your command line from the curriculum folder to commit your progress.
$ git add .
$ git commit -m "finish PicChat: PubSub reading"
$ git push
We’re proud to offer our open-source curriculum free of charge for anyone to learn from at their own pace.
We also offer a paid course where you can learn from an instructor alongside a cohort of your peers. We will accept applications for the June-August 2023 cohort soon.