Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Livebook and Elixir: concurrency, web, and AI (José Valim @ YOW! 2023)

livebook_and_elixir_concurrency_web_ai.livemd

Livebook and Elixir: concurrency, web, and AI (José Valim @ YOW! 2023)

Mix.install(
  [
    {:kino, "~> 0.14.1"},
    {:kino_vega_lite, "~> 0.1.11"},
    {:bandit, "~> 1.5.7"},
    {:req, "~> 0.5.6"},
    {:kino_bumblebee, "~> 0.5.0"},
    {:exla, ">= 0.0.0"},
    {:kino_db, "~> 0.2.3"},
    {:exqlite, "~> 0.11.0"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Dynamic

Elixir is a dynamic programming language that runs on the Erlang VM:

list = ["hello", 123, :banana]
Enum.fetch!(list, 0)

Functional

What does it mean to be functional?

Elixir data structures are immutable:

list = [1, 2, 3]
List.delete_at(list, -1)
List.delete_at(list, -1)
List.pop_at(list, -1)

You can check the list does not change.

list

This style of programming is made clear with the |> (pipe) operator:

"Elixir is cool!"
|> String.split(" ")
|> List.last()
|> String.replace_suffix("!", "")
|> String.upcase()

Concurrency

Elixir supports pattern-matching, polymorphism via protocols, meta-programming, and more. But today, we will focus on its concurrency features. In the Erlang VM, al lcode runs inside lightweight threads called processes. We can create literally create millions of them:

for _ <- 1..1_000_000 do
  spawn(fn -> :ok end)
end

Process communicate by sending messages between them:

parent = self()

child = 
  spawn(fn ->
    receive do
      :ping -> send(parent, :pong)
    end
  end)

send(child, :ping)

receive do
  :pong -> :it_worked!
end

And Livebook can helps us see how processes communicate between them

Kino.Process.render_seq_trace(fn -> 
  parent = self()

  child = 
    spawn(fn -> 
      receive do
        :ping -> send(parent, :pong)
      end
    end)

  send(child, :ping)

  receive do
    :pong -> :it_worked!
  end
end)

Maybe you want to see how Elixir can perform multiple tasks at once, scaling on both CPU. and IO?

Kino.Process.render_seq_trace(fn -> 
  ["/foo", "/bar", "/baz", "/bat"]
  |> Task.async_stream(
    fn _ -> Process.sleep(Enum.random(100..300)) end,
    max_concurrency: 4
  )
  |> Enum.to_list()
end)

Plotting live data

The Erlang VM provides a great set of tools for observability. Let’s gather information about all processes:

processes = 
  for pid <- Process.list() do
    info = Process.info(pid, [:reductions, :memory, :status])

    %{
      pid: inspect(pid),
      reductions: info[:reductions],
      memory: info[:memory],
      status: info[:status]
    }
  end

Let’s plot it better

chart = 
  VegaLite.new(width: 500, title: "Processes")
  |> VegaLite.data_from_values(processes, only: ["memory", "reductions", "status"])
  |> VegaLite.mark(:point)
  |> VegaLite.encode_field(:x, "memory", type: :quantitative, scale: [type: :log])
  |> VegaLite.encode_field(:y, "reductions", type: :quantitative)
  |> VegaLite.encode_field(:color, "status", type: :nominal)
  |> Kino.VegaLite.render()

Kino.listen(5000, fn _ ->  
  processes = 
    for pid <- Process.list() do
      info = Process.info(pid, [:reductions, :memory, :status])
  
      %{
        pid: inspect(pid),
        reductions: info[:reductions],
        memory: info[:memory],
        status: info[:status]
      }
    end

  Kino.VegaLite.clear(chart)
  Kino.VegaLite.push_many(chart, processes)
end)

WebAI

defmodule WebAI do
  use Plug.Builder

  plug :fetch_query_params
  plug :render

  def render(conn, _opts) do
    Plug.Conn.send_resp(conn, 200, "hello #{conn.params["name"] }!")
  end
end

Kino.start_child!({Bandit, plug: WebAI, port: 6789})
Req.get!("http://localhost:6789", params: [name: "Maykell"])
{:ok, model_info} =
  Bumblebee.load_model({:hf, "finiteautomata/bertweet-base-emotion-analysis"})

{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "vinai/bertweet-base"})

serving =
  Bumblebee.Text.text_classification(model_info, tokenizer,
    top_k: 100,
    compile: [batch_size: 1, sequence_length: 100],
    defn_options: [compiler: EXLA]
  )

text_input = Kino.Input.textarea("Text", default: "Oh wow, I didn't know that!")
form = Kino.Control.form([text: text_input], submit: "Run")
frame = Kino.Frame.new()

Kino.listen(form, fn %{data: %{text: text}} ->
  Kino.Frame.render(frame, Kino.Text.new("Running..."))
  output = Nx.Serving.run(serving, text)

  output.predictions
  |> Enum.map(&amp;{&amp;1.label, &amp;1.score})
  |> Kino.Bumblebee.ScoredList.new()
  |> then(&amp;Kino.Frame.render(frame, &amp;1))
end)

Kino.Layout.grid([form, frame], boxed: true, gap: 16)