Powered by AppSignal & Oban Pro

Logflare Endpoint Demo

books/logflare-endpoint-demo.livemd

Logflare Endpoint Demo

Section

Queries the Logflare HTTP endpoint logs.all.ch using Req. Fill in the form fields to set query parameters, then run the request cell to fetch results.

Mix.install([
  {:req, "~> 0.5"},
  {:kino, "~> 0.19"},
  {:jason, "~> 1.4"}
])

Client

defmodule LogflareEndpoints do
  @base_url "https://query.logflare.app/api/endpoints/query"

  def query(endpoint, api_key, params) do
    params = Enum.reject(params, fn {_k, v} -> v == "" end)

    Req.get!("#{@base_url}/#{endpoint}",
      headers: [{"x-api-key", api_key}],
      params: params
    )
    |> parse_response()
  end

  defp parse_response(%{status: 200, body: %{"result" => rows}}) when is_list(rows), do: {:ok, rows}
  defp parse_response(%{status: status, body: body}), do: {:error, status, body}
end

Query Form

now = DateTime.utc_now() |> DateTime.truncate(:second) |> DateTime.to_iso8601()
one_hour_ago = DateTime.add(DateTime.utc_now(), -3600, :second) |> DateTime.to_iso8601()

form =
  Kino.Control.form(
    [
      endpoint: Kino.Input.text("Endpoint name"),
      api_key: Kino.Input.password("API Key"),
      project_tier: Kino.Input.text("Project Tier", default: "FREE"),
      project: Kino.Input.text("Project"),
      iso_timestamp_start: Kino.Input.text("Start Timestamp (ISO 8601)", default: one_hour_ago),
      iso_timestamp_end: Kino.Input.text("End Timestamp (ISO 8601)", default: now),
      sql: Kino.Input.textarea("SQL", default: "SELECT timestamp, event_message FROM logs LIMIT 100")
    ],
    submit: "Run Query"
  )

Request

Kino.listen(form, fn %{data: %{api_key: api_key} = data} ->
  params = Map.drop(data, [:api_key]) |> Map.to_list()

  case LogflareEndpoints.query(data.endpoint, api_key, params) do
    {:ok, rows} -> Kino.DataTable.new(rows) |> Kino.render()
    {:error, status, body} -> Kino.Markdown.new("**Error #{status}**\n\n```\n#{inspect(body, pretty: true)}\n```") |> Kino.render()
  end
end)

Log Attributes Explorer

Fetches all keys from log_attributes (a Map(String, String) column), then for each key retrieves the top values by occurrence count. Results are displayed as tabs — one per key.

now = DateTime.utc_now() |> DateTime.truncate(:second) |> DateTime.to_iso8601()
one_hour_ago = DateTime.add(DateTime.utc_now(), -3600, :second) |> DateTime.to_iso8601()

attr_form =
  Kino.Control.form(
    [
      endpoint: Kino.Input.text("Endpoint name"),
      api_key: Kino.Input.password("API Key"),
      project_tier: Kino.Input.text("Project Tier", default: "FREE"),
      project: Kino.Input.text("Project"),
      iso_timestamp_start: Kino.Input.text("Start Timestamp (ISO 8601)", default: one_hour_ago),
      iso_timestamp_end: Kino.Input.text("End Timestamp (ISO 8601)", default: now)
    ],
    submit: "Explore"
  )
Kino.listen(attr_form, fn %{data: %{api_key: api_key} = data} ->
  base_params = fn sql ->
    [
      project_tier: data.project_tier,
      project: data.project,
      iso_timestamp_start: data.iso_timestamp_start,
      iso_timestamp_end: data.iso_timestamp_end,
      sql: sql
    ]
    |> Enum.reject(fn {_k, v} -> v == "" end)
  end

  # keys_sql = """
  # SELECT source_name, arrayJoin(mapKeys(log_attributes)) AS key, count() AS count
  # FROM logs
  # WHERE 
  # timestamp BETWEEN parseDateTimeBestEffortOrNull('#{data.iso_timestamp_start}') AND parseDateTimeBestEffortOrNull('#{data.iso_timestamp_end}')
  # GROUP BY all
  # ORDER BY count DESC
  # limit 20
  # """

  cloudflare_keys = ["response.status_code", "response.origin_time", "request.headers.user_agent", "request.method"]
  postgres_keys = ["parsed.user_name", "parsed.application_name", "parsed.backend_type"]

  with keys <- cloudflare_keys ++ postgres_keys do
    # {:ok, key_rows} <- LogflareEndpoints.query(data.endpoint, api_key, base_params.(keys_sql)) do    
    # keys_from_sql = Enum.map(key_rows, & &1["key"])


    tabs =
      Enum.map(keys, fn key ->
        values_sql = """
        SELECT log_attributes['#{key}'] AS value, count() AS count
        FROM logs
        WHERE 
          timestamp BETWEEN parseDateTimeBestEffortOrNull('#{data.iso_timestamp_start}') AND parseDateTimeBestEffortOrNull('#{data.iso_timestamp_end}')
          AND mapContains(log_attributes, '#{key}')
        GROUP BY all
        ORDER BY count DESC
        LIMIT 20
        """

        content =
          case LogflareEndpoints.query(data.endpoint, api_key, base_params.(values_sql)) do
            {:ok, rows} -> Kino.DataTable.new(rows, name: key)
            {:error, status, body} -> Kino.Markdown.new("**Error #{status}**: #{inspect(body)}")
          end

        {key, content}
      end)

    Kino.Layout.tabs(tabs) |> Kino.render()
  else
    error ->
      Kino.Markdown.new("**Error processing keys **\n\n```\n#{inspect(error, pretty: true)}\n```")
      |> Kino.render()
  end
end)