Livebook and Elixir: concurrency, web, and AI (José Valim @ YOW! 2023)
Mix.install(
[
{:kino, "~> 0.14.1"},
{:kino_vega_lite, "~> 0.1.11"},
{:bandit, "~> 1.5.7"},
{:req, "~> 0.5.6"},
{:kino_bumblebee, "~> 0.5.0"},
{:exla, ">= 0.0.0"},
{:kino_db, "~> 0.2.3"},
{:exqlite, "~> 0.11.0"}
],
config: [nx: [default_backend: EXLA.Backend]]
)
Dynamic
Elixir is a dynamic programming language that runs on the Erlang VM:
list = ["hello", 123, :banana]
Enum.fetch!(list, 0)
Functional
What does it mean to be functional?
Elixir data structures are immutable:
list = [1, 2, 3]
List.delete_at(list, -1)
List.delete_at(list, -1)
List.pop_at(list, -1)
You can check the list does not change.
list
This style of programming is made clear with the |>
(pipe) operator:
"Elixir is cool!"
|> String.split(" ")
|> List.last()
|> String.replace_suffix("!", "")
|> String.upcase()
Concurrency
Elixir supports pattern-matching, polymorphism via protocols, meta-programming, and more. But today, we will focus on its concurrency features. In the Erlang VM, al lcode runs inside lightweight threads called processes. We can create literally create millions of them:
for _ <- 1..1_000_000 do
spawn(fn -> :ok end)
end
Process communicate by sending messages between them:
parent = self()
child =
spawn(fn ->
receive do
:ping -> send(parent, :pong)
end
end)
send(child, :ping)
receive do
:pong -> :it_worked!
end
And Livebook can helps us see how processes communicate between them
Kino.Process.render_seq_trace(fn ->
parent = self()
child =
spawn(fn ->
receive do
:ping -> send(parent, :pong)
end
end)
send(child, :ping)
receive do
:pong -> :it_worked!
end
end)
Maybe you want to see how Elixir can perform multiple tasks at once, scaling on both CPU. and IO?
Kino.Process.render_seq_trace(fn ->
["/foo", "/bar", "/baz", "/bat"]
|> Task.async_stream(
fn _ -> Process.sleep(Enum.random(100..300)) end,
max_concurrency: 4
)
|> Enum.to_list()
end)
Plotting live data
The Erlang VM provides a great set of tools for observability. Let’s gather information about all processes:
processes =
for pid <- Process.list() do
info = Process.info(pid, [:reductions, :memory, :status])
%{
pid: inspect(pid),
reductions: info[:reductions],
memory: info[:memory],
status: info[:status]
}
end
Let’s plot it better
chart =
VegaLite.new(width: 500, title: "Processes")
|> VegaLite.data_from_values(processes, only: ["memory", "reductions", "status"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "memory", type: :quantitative, scale: [type: :log])
|> VegaLite.encode_field(:y, "reductions", type: :quantitative)
|> VegaLite.encode_field(:color, "status", type: :nominal)
|> Kino.VegaLite.render()
Kino.listen(5000, fn _ ->
processes =
for pid <- Process.list() do
info = Process.info(pid, [:reductions, :memory, :status])
%{
pid: inspect(pid),
reductions: info[:reductions],
memory: info[:memory],
status: info[:status]
}
end
Kino.VegaLite.clear(chart)
Kino.VegaLite.push_many(chart, processes)
end)
WebAI
defmodule WebAI do
use Plug.Builder
plug :fetch_query_params
plug :render
def render(conn, _opts) do
Plug.Conn.send_resp(conn, 200, "hello #{conn.params["name"] }!")
end
end
Kino.start_child!({Bandit, plug: WebAI, port: 6789})
Req.get!("http://localhost:6789", params: [name: "Maykell"])
{:ok, model_info} =
Bumblebee.load_model({:hf, "finiteautomata/bertweet-base-emotion-analysis"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "vinai/bertweet-base"})
serving =
Bumblebee.Text.text_classification(model_info, tokenizer,
top_k: 100,
compile: [batch_size: 1, sequence_length: 100],
defn_options: [compiler: EXLA]
)
text_input = Kino.Input.textarea("Text", default: "Oh wow, I didn't know that!")
form = Kino.Control.form([text: text_input], submit: "Run")
frame = Kino.Frame.new()
Kino.listen(form, fn %{data: %{text: text}} ->
Kino.Frame.render(frame, Kino.Text.new("Running..."))
output = Nx.Serving.run(serving, text)
output.predictions
|> Enum.map(&{&1.label, &1.score})
|> Kino.Bumblebee.ScoredList.new()
|> then(&Kino.Frame.render(frame, &1))
end)
Kino.Layout.grid([form, frame], boxed: true, gap: 16)