Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Telemetry

livebooks/telemetry.livemd

Telemetry

my_app_root = Path.join(__DIR__, "..")

Mix.install(
  [
    {:grpc, path: my_app_root, env: :dev},
    {:telemetry_metrics, "~> 0.7"},
    {:telemetry_metrics_prometheus, "~> 1.1"},
    {:req, "~> 0.3"}
  ],
  config_path: Path.join(my_app_root, "config/config.exs"),
  lockfile: Path.join(my_app_root, "mix.lock")
)

Telemetry Events

We know from the documentation GRPC.Telemetry that some server-side events and some client-side events are published. We can use those events to build a Prometheus metrics export through Telemetry.Metrics and TelemetryMetricsPrometheus that’s retrocompatible to the deprecated :grpc_prometheus library.

First, let’s create a mock server and client that we can use.

GRPC Server and Client

# This whole code block is taken from test/support/proto/helloword.pb.ex
# Normally, this would be autogenerated from a valid protobuf file,
# but we're using this hardcoded version so that this Livebook is self-contained.

defmodule Helloworld.HelloRequest do
  @moduledoc false
  use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3

  field(:name, 1, type: :string)
  field(:duration, 2, proto3_optional: true, type: :int32)
end

defmodule Helloworld.HelloReply do
  @moduledoc false
  use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3

  field(:message, 1, type: :string)
end

defmodule Helloworld.HeaderRequest do
  @moduledoc false
  use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
end

defmodule Helloworld.HeaderReply do
  @moduledoc false
  use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3

  field(:authorization, 1, type: :string)
end

defmodule Helloworld.Greeter.Service do
  @moduledoc false
  use GRPC.Service, name: "helloworld.Greeter", protoc_gen_elixir_version: "0.11.0"

  rpc(:SayHello, Helloworld.HelloRequest, Helloworld.HelloReply)

  rpc(:CheckHeaders, Helloworld.HeaderRequest, Helloworld.HeaderReply)
end

defmodule Helloworld.Greeter.Stub do
  @moduledoc false
  use GRPC.Stub, service: Helloworld.Greeter.Service
end
defmodule HelloServer do
  use GRPC.Server, service: Helloworld.Greeter.Service

  def say_hello(%{name: "raise", duration: duration}, _stream) do
    Process.sleep(duration)
    raise ArgumentError, "exception raised"
  end

  def say_hello(%{name: "ok", duration: duration}, _stream) do
    Process.sleep(duration)
    Helloworld.HelloReply.new(message: "Hello")
  end

  def say_hello(%{name: "not_found", duration: duration}, _stream) do
    Process.sleep(duration)
    raise GRPC.RPCError, status: GRPC.Status.not_found()
  end

  def check_headers(_req, stream) do
    token = GRPC.Stream.get_headers(stream)["authorization"]
    Helloworld.HeaderReply.new(authorization: token)
  end
end
defmodule MetricsSupervisor do
  use Supervisor
  import Telemetry.Metrics

  def start_link(arg) do
    Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
  end

  def init(_arg) do
    # We attach to these events to combine them into a single published metric
    # This can actually be done directly in Prometheus through Recording Rules,
    # but this shows how to build a drop-in replacement for :grpc_prometheus
    :telemetry.attach_many(
      "handler-#{__MODULE__}",
      [
        GRPC.Telemetry.server_rpc_prefix() ++ [:stop],
        GRPC.Telemetry.server_rpc_prefix() ++ [:exception],
        GRPC.Telemetry.client_rpc_prefix() ++ [:stop],
        GRPC.Telemetry.client_rpc_prefix() ++ [:exception]
      ],
      fn [:grpc, server_or_client, :rpc, event_kind],
         %{duration: duration},
         %{stream: stream} = metadata,
         _opts ->
        code =
          case {event_kind, metadata[:result], metadata[:reason]} do
            {:stop, {:ok, _}, _} -> GRPC.Status.code_name(0)
            {:stop, {:ok, _, _}, _} -> GRPC.Status.code_name(0)
            {:stop, {:error, %GRPC.RPCError{status: status}}, _} -> GRPC.Status.code_name(status)
            {:exception, _, %GRPC.RPCError{status: status}} -> GRPC.Status.code_name(status)
            _ -> GRPC.Status.code_name(GRPC.Status.unknown())
          end

        metadata = %{
          grpc_service: stream.service_name,
          grpc_method: stream.method_name,
          grpc_type: stream.grpc_type,
          grpc_code: code
        }

        if is_message(stream) do
          :telemetry.execute(
            [:custom_grpc, :"#{server_or_client}_rpc", :sent],
            %{duration: duration},
            metadata
          )
        end

        :telemetry.execute(
          [:custom_grpc, :"#{server_or_client}_rpc", :handled],
          %{duration: duration},
          metadata
        )
      end,
      nil
    )

    # This can also be achieved through some clever use of tags+tag_values,
    # without having to attach and publish a new event. However, that would
    # end up leaking an extraneous tag to Prometheus.
    # This is cleaner in that sense.
    :telemetry.attach_many(
      "handler-#{__MODULE__}-start",
      [
        GRPC.Telemetry.server_rpc_prefix() ++ [:start],
        GRPC.Telemetry.client_rpc_prefix() ++ [:start]
      ],
      fn _event, _, %{stream: stream}, _opts ->
        if is_message(stream) do
          :telemetry.execute([:custom_grpc, :server_rpc, :message_received], %{count: 1})
        end
      end,
      nil
    )

    children = [
      {TelemetryMetricsPrometheus, [metrics: metrics(), port: 9568]}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end

  defp is_message(stream) do
    stream.grpc_type in [:client_stream, :bidirectional_stream]
  end

  @histogram_buckets_seconds [5.0e-3, 10.0e-3, 25.0e-3, 50.0e-3, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]

  @tags [:grpc_service, :grpc_method, :grpc_type]
  @tags_with_code [:grpc_code | @tags]

  defp metrics do
    [
      # Server Metrics
      counter(
        "grpc_server_started_total",
        event_name: "grpc.server.rpc.start",
        measurement: :count,
        tags: @tags,
        tag_values: &extract_tags_from_stream/1,
        description: "Total number of RPCs started on the server"
      ),
      counter(
        "grpc_server_msg_received_total",
        event_name: "custom_grpc.server_rpc.message_received",
        measurement: :count,
        tags: @tags,
        tag_values: &extract_tags_from_stream/1,
        description: "Total number of RPC stream messages received on the server"
      ),
      counter(
        "grpc_server_msg_sent_total",
        event_name: "custom_grpc.server_rpc.sent",
        measurement: :duration,
        tags: @tags,
        description: "Total number of gRPC stream messages sent by the server."
      ),
      counter(
        "grpc_server_handled_total",
        event_name: "custom_grpc.server_rpc.handled",
        measurement: :duration,
        tags: @tags_with_code,
        description:
          "Total number of RPCs completed on the server, regardless of success or failure."
      ),
      distribution(
        "grpc_server_handled_latency_seconds",
        event_name: "custom_grpc.server_rpc.handled",
        description: "Histogram of response latency of rpcs handled by the server, in seconds.",
        measurement: :duration,
        tags: @tags_with_code,
        unit: {:native, :second},
        reporter_options: [
          buckets: @histogram_buckets_seconds
        ]
      ),

      # Client Metrics
      counter(
        "grpc_client_started_total",
        event_name: "grpc.client.rpc.start",
        measurement: :count,
        tags: @tags,
        tag_values: &extract_tags_from_stream/1,
        description: "Total number of RPCs started on the client"
      ),
      counter(
        "grpc_client_msg_received_total",
        event_name: "custom_grpc.client_rpc.message_received",
        measurement: :count,
        tags: @tags,
        tag_values: &extract_tags_from_stream/1,
        description: "Total number of RPC stream messages received on the client"
      ),
      counter(
        "grpc_client_msg_sent_total",
        event_name: "custom_grpc.client_rpc.sent",
        measurement: :duration,
        tags: @tags,
        description: "Total number of gRPC stream messages sent by the client."
      ),
      counter(
        "grpc_client_handled_total",
        event_name: "custom_grpc.client_rpc.handled",
        measurement: :duration,
        tags: @tags_with_code,
        description:
          "Total number of RPCs completed on the client, regardless of success or failure."
      ),
      distribution(
        "grpc_client_handled_latency_seconds",
        event_name: "custom_grpc.client_rpc.handled",
        description: "Histogram of response latency of rpcs handled by the client, in seconds.",
        measurement: :duration,
        tags: @tags_with_code,
        unit: {:native, :second},
        reporter_options: [
          buckets: @histogram_buckets_seconds
        ]
      )
    ]
  end

  defp extract_tags_from_stream(%{
         stream: %{
           service_name: service_name,
           method_name: method_name,
           grpc_type: grpc_type
         }
       }) do
    %{
      grpc_service: service_name,
      grpc_method: method_name,
      grpc_type: grpc_type
    }
  end

  defp extract_tags_from_stream(_) do
    %{grpc_service: nil, grpc_method: nil, grpc_type: nil}
  end
end
MetricsSupervisor.start_link([])
GRPC.Server.start([HelloServer], 1337, [])
{:ok, channel} = GRPC.Stub.connect("localhost:1337")

# the requests take some time internally, so we might not get _exactly_
# the bucket distribution we expect
for duration <- [0, 6, 101, 101, 501, 1001] do
  success_request = Helloworld.HelloRequest.new(name: "ok", duration: duration)
  exception_request = Helloworld.HelloRequest.new(name: "raise", duration: duration)
  not_found_request = Helloworld.HelloRequest.new(name: "not_found", duration: duration)

  {:ok, _} = Helloworld.Greeter.Stub.say_hello(channel, success_request)

  {:error, %GRPC.RPCError{status: 2}} =
    Helloworld.Greeter.Stub.say_hello(channel, exception_request)

  {:error, %GRPC.RPCError{status: 5}} =
    Helloworld.Greeter.Stub.say_hello(channel, not_found_request)
end
# Checking which metrics our endpoint returns
# TO-DO: replace this by actual server calls

{:ok, %Req.Response{status: 200, body: body}} = Req.request(url: "http://localhost:9568/metrics")

# Print the output in a readable format
IO.puts(body)
nil