Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Elixir, Concurrency and AI

2024-07-26-elixir-ai.livemd

Elixir, Concurrency and AI

Mix.install(
  [
    {:req, "~> 0.5.4"},
    {:bandit, "~> 1.5"},
    {:kino, "~> 0.13.2"},
    {:kino_vega_lite, "~> 0.1.13"},
    {:kino_db, "~> 0.2.8"},
    {:exqlite, "~> 0.23.0"},
    {:nx, "~> 0.7.3"},
    {:kino_bumblebee, "~> 0.5.0"},
    {:exla, ">= 0.0.0"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Welcome

Shamelessly copied from:

George Guimarães

Elixir is a dynamic and functional programming language that runs on the Erlang VM:

list = ["hello", 123, :banana]
Enum.fetch!(list, 0)

Functional

What does it mean to be functional?

Let’s see some Object-Oriented code:

>> list = [1, 2, 3]
>> list.pop()
3
>> list.pop()
2
>> list.pop()
1

The value of the list changes. Let’s compare it with Elixir:

list = [1, 2, 3]
List.delete_at(list, -1)
List.delete_at(list, -1)

Or when actually popping from a list:

List.pop_at(list, -1)

Elixir data structures are immutable. This style of programming is made clear with the |> (pipe) operator:

01..60
|> Enum.to_list()
|> Enum.take_random(6)
|> Enum.sort()
|> dbg 

Concurrency

Elixir supports pattern-matching, polymorphism via protocols, meta-programming, and more. But today, we will focus on its concurrency features. In the Erlang VM, all code runs inside lightweight threads called processes. We can literally create millions of them:

for _ <- 1..1_000_000 do
  spawn(fn -> :ok end)
end

Process communicate by sending messages between them:

parent = self()

child =
  spawn(fn ->
    receive do
      :ping -> send(parent, :pong)
    end
  end)

send(child, :ping)

receive do
  :pong -> :it_worked!
end

And Livebook can helps us see how processes communicate between them:

Kino.Process.render_seq_trace(fn ->
  parent = self()

  child =
    spawn(fn ->
      receive do
        :ping -> send(parent, :pong)
      end
    end)

  send(child, :ping)

  receive do
    :pong -> :it_worked!
  end
end)

Maybe you want to see how Elixir can perform multiple tasks at once, scaling on both CPU and IO?

Kino.Process.render_seq_trace(fn ->
  ["/foo", "/bar", "/baz", "/bat"]
  |> Task.async_stream(
    fn _ -> Process.sleep(Enum.random(100..300)) end,
    max_concurrency: 4
  )
  |> Enum.to_list()
end)

Let’s take visualizations even further!

Plotting live data

The Erlang VM provides a great set of tools for observability. Let’s gather information about all processes:

processes =
  for pid <- Process.list() do
    info = Process.info(pid, [:reductions, :memory, :status])

    %{
      pid: inspect(pid),
      reductions: info[:reductions],
      memory: info[:memory],
      status: info[:status]
    }
  end

But how to plot it?

Smart cell here!

VegaLite.new(width: 500, height: 500)
|> VegaLite.data_from_values(processes, only: ["memory", "reductions", "status"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "memory", type: :quantitative, scale: [type: :log])
|> VegaLite.encode_field(:y, "reductions", type: :quantitative, scale: [type: :log])
|> VegaLite.encode_field(:color, "status", type: :nominal)
VegaLite.new(width: 700, height: 700)
|> VegaLite.data_from_values(processes, only: ["memory", "reductions", "status"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "memory", type: :quantitative, scale: [type: :log])
|> VegaLite.encode_field(:y, "reductions", type: :quantitative, scale: [type: :log])
|> VegaLite.encode_field(:color, "status", type: :nominal)

Web + AI

defmodule Web do
  use Plug.Builder

  plug :fetch_query_params
  plug :render

  def render(conn, _opts) do
    name = conn.params["name"]
    Plug.Conn.send_resp(conn, 200, "hello world #{name}!")
  end
end
Kino.start_child!({Bandit, plug: Web, port: 9010})
Req.get!("http://localhost:9010", params: [name: "George"])

Neural Network Smart Cell

{:ok, model_info} =
  Bumblebee.load_model({:hf, "finiteautomata/bertweet-base-emotion-analysis"})

{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "vinai/bertweet-base"})

serving =
  Bumblebee.Text.text_classification(model_info, tokenizer,
    compile: [batch_size: 1, sequence_length: 100],
    defn_options: [compiler: EXLA]
  )

text_input = Kino.Input.textarea("Text", default: "Oh wow, I didn't know that!")
form = Kino.Control.form([text: text_input], submit: "Run")
frame = Kino.Frame.new()

Kino.listen(form, fn %{data: %{text: text}} ->
  Kino.Frame.render(frame, Kino.Text.new("Running..."))
  output = Nx.Serving.run(serving, text)

  output.predictions
  |> Enum.map(&amp;{&amp;1.label, &amp;1.score})
  |> Kino.Bumblebee.ScoredList.new()
  |> then(&amp;Kino.Frame.render(frame, &amp;1))
end)

Kino.Layout.grid([form, frame], boxed: true, gap: 16)
{:ok, model_info} =
  Bumblebee.load_model({:hf, "finiteautomata/bertweet-base-emotion-analysis"})

{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "vinai/bertweet-base"})

serving =
  Bumblebee.Text.text_classification(model_info, tokenizer,
    compile: [batch_size: 1, sequence_length: 100],
    defn_options: [compiler: EXLA]
  )

text_input = Kino.Input.textarea("Text", default: "Oh wow, I didn't know that!")
form = Kino.Control.form([text: text_input], submit: "Run")
frame = Kino.Frame.new()

Kino.listen(form, fn %{data: %{text: text}} ->
  Kino.Frame.render(frame, Kino.Text.new("Running..."))
  output = Nx.Serving.run(serving, text)

  output.predictions
  |> Enum.map(&amp;{&amp;1.label, &amp;1.score})
  |> Kino.Bumblebee.ScoredList.new()
  |> then(&amp;Kino.Frame.render(frame, &amp;1))
end)

Kino.Layout.grid([form, frame], boxed: true, gap: 16)

Nx.Serving with Batching

text_input = Kino.Input.textarea("Text", default: "Oh wow, I didn't know that!")
form = Kino.Control.form([text: text_input], submit: "Run")
frame = Kino.Frame.new()

Kino.listen(form, fn %{data: %{text: text}} ->
  Kino.Frame.render(frame, Kino.Text.new("Running..."))
  output = Nx.Serving.batched_run(:web_ai_serving, text)

  output.predictions
  |> Enum.map(&amp;{&amp;1.label, &amp;1.score})
  |> Kino.Bumblebee.ScoredList.new()
  |> then(&amp;Kino.Frame.render(frame, &amp;1))
end)

Kino.Layout.grid([form, frame], boxed: true, gap: 16)

A New Web App with AI

defmodule WebAI do
  use Plug.Builder

  plug :fetch_query_params
  plug :render

  def render(conn, _opts) do
    text = conn.params["text"]
    output = Nx.Serving.batched_run(:web_ai_serving, text)

    [ %{ label: label, score: _ } | _ ] = output.predictions 
    
    Plug.Conn.send_resp(conn, 200, "this was #{label}!")
  end
end

Kino.start_child!({Bandit, plug: WebAI, port: 9003})
node = :"livebook_ggnn34o7--5fguoy33@127.0.0.1"
cookie = :"c_WcTIb_fj3crlqhYI0unp0ymPoakEb6f7Ub_HH4vqJJ7pwwiO5fG6"

Node.set_cookie(node, cookie)
Node.connect(node)
Req.get!("http://localhost:9003", params: [text: "I'm listening to The Cure"])