Elixir, Concurrency and AI
Mix.install(
[
{:req, "~> 0.5.4"},
{:bandit, "~> 1.5"},
{:kino, "~> 0.13.2"},
{:kino_vega_lite, "~> 0.1.13"},
{:kino_db, "~> 0.2.8"},
{:exqlite, "~> 0.23.0"},
{:nx, "~> 0.7.3"},
{:kino_bumblebee, "~> 0.5.0"},
{:exla, ">= 0.0.0"}
],
config: [nx: [default_backend: EXLA.Backend]]
)
Welcome
Shamelessly copied from:
George Guimarães
- formerly co-founder at Plataformatec, company behind Elixir
- principal engineer at CloudWalk
- https://twitter.com/georgeguimaraes
- https://www.linkedin.com/in/georgeguimaraes/
Elixir is a dynamic and functional programming language that runs on the Erlang VM:
list = ["hello", 123, :banana]
Enum.fetch!(list, 0)
Functional
What does it mean to be functional?
Let’s see some Object-Oriented code:
>> list = [1, 2, 3]
>> list.pop()
3
>> list.pop()
2
>> list.pop()
1
The value of the list changes. Let’s compare it with Elixir:
list = [1, 2, 3]
List.delete_at(list, -1)
List.delete_at(list, -1)
Or when actually popping from a list:
List.pop_at(list, -1)
Elixir data structures are immutable. This style of programming is made clear with the |>
(pipe) operator:
01..60
|> Enum.to_list()
|> Enum.take_random(6)
|> Enum.sort()
|> dbg
Concurrency
Elixir supports pattern-matching, polymorphism via protocols, meta-programming, and more. But today, we will focus on its concurrency features. In the Erlang VM, all code runs inside lightweight threads called processes. We can literally create millions of them:
for _ <- 1..1_000_000 do
spawn(fn -> :ok end)
end
Process communicate by sending messages between them:
parent = self()
child =
spawn(fn ->
receive do
:ping -> send(parent, :pong)
end
end)
send(child, :ping)
receive do
:pong -> :it_worked!
end
And Livebook can helps us see how processes communicate between them:
Kino.Process.render_seq_trace(fn ->
parent = self()
child =
spawn(fn ->
receive do
:ping -> send(parent, :pong)
end
end)
send(child, :ping)
receive do
:pong -> :it_worked!
end
end)
Maybe you want to see how Elixir can perform multiple tasks at once, scaling on both CPU and IO?
Kino.Process.render_seq_trace(fn ->
["/foo", "/bar", "/baz", "/bat"]
|> Task.async_stream(
fn _ -> Process.sleep(Enum.random(100..300)) end,
max_concurrency: 4
)
|> Enum.to_list()
end)
Let’s take visualizations even further!
Plotting live data
The Erlang VM provides a great set of tools for observability. Let’s gather information about all processes:
processes =
for pid <- Process.list() do
info = Process.info(pid, [:reductions, :memory, :status])
%{
pid: inspect(pid),
reductions: info[:reductions],
memory: info[:memory],
status: info[:status]
}
end
But how to plot it?
Smart cell here!
VegaLite.new(width: 500, height: 500)
|> VegaLite.data_from_values(processes, only: ["memory", "reductions", "status"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "memory", type: :quantitative, scale: [type: :log])
|> VegaLite.encode_field(:y, "reductions", type: :quantitative, scale: [type: :log])
|> VegaLite.encode_field(:color, "status", type: :nominal)
VegaLite.new(width: 700, height: 700)
|> VegaLite.data_from_values(processes, only: ["memory", "reductions", "status"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "memory", type: :quantitative, scale: [type: :log])
|> VegaLite.encode_field(:y, "reductions", type: :quantitative, scale: [type: :log])
|> VegaLite.encode_field(:color, "status", type: :nominal)
Web + AI
defmodule Web do
use Plug.Builder
plug :fetch_query_params
plug :render
def render(conn, _opts) do
name = conn.params["name"]
Plug.Conn.send_resp(conn, 200, "hello world #{name}!")
end
end
Kino.start_child!({Bandit, plug: Web, port: 9010})
Req.get!("http://localhost:9010", params: [name: "George"])
Neural Network Smart Cell
{:ok, model_info} =
Bumblebee.load_model({:hf, "finiteautomata/bertweet-base-emotion-analysis"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "vinai/bertweet-base"})
serving =
Bumblebee.Text.text_classification(model_info, tokenizer,
compile: [batch_size: 1, sequence_length: 100],
defn_options: [compiler: EXLA]
)
text_input = Kino.Input.textarea("Text", default: "Oh wow, I didn't know that!")
form = Kino.Control.form([text: text_input], submit: "Run")
frame = Kino.Frame.new()
Kino.listen(form, fn %{data: %{text: text}} ->
Kino.Frame.render(frame, Kino.Text.new("Running..."))
output = Nx.Serving.run(serving, text)
output.predictions
|> Enum.map(&{&1.label, &1.score})
|> Kino.Bumblebee.ScoredList.new()
|> then(&Kino.Frame.render(frame, &1))
end)
Kino.Layout.grid([form, frame], boxed: true, gap: 16)
{:ok, model_info} =
Bumblebee.load_model({:hf, "finiteautomata/bertweet-base-emotion-analysis"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "vinai/bertweet-base"})
serving =
Bumblebee.Text.text_classification(model_info, tokenizer,
compile: [batch_size: 1, sequence_length: 100],
defn_options: [compiler: EXLA]
)
text_input = Kino.Input.textarea("Text", default: "Oh wow, I didn't know that!")
form = Kino.Control.form([text: text_input], submit: "Run")
frame = Kino.Frame.new()
Kino.listen(form, fn %{data: %{text: text}} ->
Kino.Frame.render(frame, Kino.Text.new("Running..."))
output = Nx.Serving.run(serving, text)
output.predictions
|> Enum.map(&{&1.label, &1.score})
|> Kino.Bumblebee.ScoredList.new()
|> then(&Kino.Frame.render(frame, &1))
end)
Kino.Layout.grid([form, frame], boxed: true, gap: 16)
Nx.Serving with Batching
text_input = Kino.Input.textarea("Text", default: "Oh wow, I didn't know that!")
form = Kino.Control.form([text: text_input], submit: "Run")
frame = Kino.Frame.new()
Kino.listen(form, fn %{data: %{text: text}} ->
Kino.Frame.render(frame, Kino.Text.new("Running..."))
output = Nx.Serving.batched_run(:web_ai_serving, text)
output.predictions
|> Enum.map(&{&1.label, &1.score})
|> Kino.Bumblebee.ScoredList.new()
|> then(&Kino.Frame.render(frame, &1))
end)
Kino.Layout.grid([form, frame], boxed: true, gap: 16)
A New Web App with AI
defmodule WebAI do
use Plug.Builder
plug :fetch_query_params
plug :render
def render(conn, _opts) do
text = conn.params["text"]
output = Nx.Serving.batched_run(:web_ai_serving, text)
[ %{ label: label, score: _ } | _ ] = output.predictions
Plug.Conn.send_resp(conn, 200, "this was #{label}!")
end
end
Kino.start_child!({Bandit, plug: WebAI, port: 9003})
node = :"livebook_ggnn34o7--5fguoy33@127.0.0.1"
cookie = :"c_WcTIb_fj3crlqhYI0unp0ymPoakEb6f7Ub_HH4vqJJ7pwwiO5fG6"
Node.set_cookie(node, cookie)
Node.connect(node)
Req.get!("http://localhost:9003", params: [text: "I'm listening to The Cure"])