Elixir playground
Mix.install([
:req,
{:kino, "~> 0.14.0"},
{:vega_lite, "~> 0.1.10"},
{:kino_vega_lite, "~> 0.1.9"},
:jason
])
1. Elixir para datos en vivo
Este notebook construye paso a paso un flujo de datos en tiempo real usando Elixir, haciendo uso de visualizaciones con VegaLite.
Objetivo:
-
Construir un pipeline de datos en tiempo (casi) real en Elixir usando Livebook, que:
-
Consuma datos de una API (por ejemplo, criptomonedas o clima).
-
Los procese (transforme, filtre, calcule).
-
Los visualice de forma dinámica (usando gráficos con Kino.VegaLite).
Estructura del proyecto en Livebook
-
Setup del entorno
-
Lectura de datos en vivo desde una API
-
Transformación del stream de datos
-
Visualización en tiempo real
-
Análisis y estadísticas
-
(opcional) Comparación de rendimiento entre ejecución secuencial y concurrente
Lectura de datos desde una API
Elegimos una API simple de usar y con datos en vivo. Por ejemplo: CoinGecko, que no requiere API key.
defmodule PrecioCripto do
@api_url "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd"
def obtener_precio do
case Req.get(@api_url) do
{:ok, %Req.Response{body: %{"bitcoin" => %{"usd" => precio}}}} ->
{:ok, precio}
_ ->
{:error, :no_se_pudo_obtener}
end
end
end
# Probamos una llamada:
PrecioCripto.obtener_precio()
Loop de lectura periódica de datos
Leer el precio de Bitcoin cada N segundos y almacenarlo para graficar su evolución.
- Estructura de datos
Usamos un Agent para guardar el historial de precios con timestamps:
{:ok, _} = Agent.start_link(fn -> [] end, name: :data)
Función para agregar el nuevo dato
defmodule Pipeline do
def agregar_dato(precio) do
timestamp = DateTime.utc_now() |> DateTime.to_string()
Agent.update(:data, fn historial ->
historial ++ [%{timestamp: timestamp, precio: precio}]
end)
end
end
Loop recurrente de recolección de datos
Esto lanza un proceso concurrente que irá recolectando datos.
spawn(fn ->
Stream.repeatedly(fn ->
case PrecioCripto.obtener_precio() do
{:ok, precio} ->
Pipeline.agregar_dato(precio)
IO.puts("Precio actualizado: #{precio}")
{:error, _} ->
IO.puts("Error al obtener precio")
end
Process.sleep(500) # cada 5 segundos
end)
|> Stream.run()
end)
Visualización en tiempo real
Usaremos Kino.VegaLite
para mostrar un gráfico que se actualice automáticamente.
Visualización con gráfico de líneas
Primero, función para obtener los datos actuales:
obtener_datos_para_grafico = fn ->
Agent.get(:data, fn historial -> historial end)
end
Ahora, renderizamos el gráfico y lo actualizamos en vivo cada X segundos:
datos_iniciales = obtener_datos_para_grafico.()
ahora = DateTime.utc_now()
hace_2_min = DateTime.add(ahora, -120, :second)
vl =
VegaLite.new()
#|> VegaLite.data_from_values(datos_iniciales, name: "btc")
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "timestamp", type: :temporal, scale: %{
zero: false
})
|> VegaLite.encode_field(:y, "precio", type: :quantitative,
scale: %{zero: false}, # No forzar a empezar en 0
axis: %{format: ".0f"} # Quitar decimales del eje Y
)
|> then(fn vl -> # Aumentar tamaño del gráfico
%{vl | spec: Map.merge(vl.spec, %{"width" => 1000, "height" => 500})}
end)
grafico = Kino.VegaLite.new(vl)
# Escuchar y actualizar usando push
Kino.listen(100, fn _ ->
nuevos_datos = obtener_datos_para_grafico.() |> Enum.map(fn %{timestamp: t, precio: p} -> %{"timestamp" => t, "precio" => p} end)
Kino.VegaLite.push(grafico, List.last(nuevos_datos), window: 1000)
end)
LiveBook como playground para sistemas concurrentes
🎯 Objetivo de esta sección
Demostrar cómo Livebook permite visualizar, experimentar y entender conceptos de concurrencia en Elixir de forma inmediata, interactiva y didáctica.
🧠 Introducción a la concurrencia en Elixir
La concurrencia es la capacidad de ejecutar múltiples tareas aparentemente al mismo tiempo. En Elixir, esto se logra usando procesos ligeros que se comunican por mensajes.
flowchart LR
A[Proceso 1] -- mensaje --> B[Proceso 2]
B -- respuesta --> A
C[Proceso 3] -- mensaje --> B
Cada proceso tiene su propio estado, no comparten memoria, y se comunican enviando y recibiendo mensajes.
🧪 Crear un proceso simple
Este código crea un proceso nuevo que imprime un mensaje. El proceso se ejecuta de forma concurrente, separado del proceso principal.
spawn(fn -> IO.puts("¡Hola desde otro proceso!") end)
📬 Envío y recepción de mensajes
# Crear un proceso que espera un mensaje
pid = spawn(fn ->
receive do
{:saludo, nombre} -> IO.puts("Hola, #{nombre}!")
end
end)
# Enviar el mensaje
send(pid, {:saludo, "Livebook"})
receive
bloquea el proceso hasta que recibe un mensaje que coincida con algún patrón.
💥 Proceso que falla
defmodule Worker do
def start do
spawn(fn -> trabajo() end)
end
defp trabajo do
raise "¡Ups! Fallé"
end
end
pid = Worker.start()
Este proceso lanza un error, pero como está aislado, no afecta al resto del sistema. Esta es una propiedad clave de la concurrencia en Elixir.
🔁 Múltiples procesos haciendo tareas concurrentes
# Simular 5 procesos con diferentes tiempos de ejecución
for i <- 1..5 do
spawn(fn ->
:timer.sleep(:rand.uniform(1000))
IO.puts("Proceso #{i} completado")
end)
end
Cada proceso duerme un tiempo aleatorio y luego imprime su número. Verás que el orden de ejecución no es secuencial.
🚀 Laboratorio interactivo: lanzar muchos procesos
# Lanzar 1000 procesos concurrentes
Enum.each(1..1000, fn i ->
spawn(fn ->
if rem(i, 100) == 0, do: IO.puts("Proceso #{i} lanzado")
end)
end)
Esto demuestra que lanzar miles de procesos es barato en Elixir. La BEAM (máquina virtual de Erlang) está optimizada para esto.
✅ Conclusión
- Los procesos en Elixir son ligeros y aislados.
- Se comunican por paso de mensajes.
- Livebook permite experimentar e interactuar con ellos de forma inmediata.
graph TD
A[Livebook + Elixir]
subgraph Datos en Vivo
A1[📈 BTC Gráfico Tiempo Real]
end
subgraph Concurrencia y Simulación
A2[🐝 Abejas Concurrentes]
end
subgraph APIs Interactivas
A3[🔌 Test de APIs con Historial]
end
subgraph IA/ML desde 0
A4[🧠 Clasificador de Vino con Axon]
end
subgraph Lógica y Reglas
A5[⚖️ Reglas Funcionales y DSLs]
end
A --> A1
A --> A2
A --> A3
A --> A4
A --> A5