Telemetry
my_app_root = Path.join(__DIR__, "..")
Mix.install(
[
{:grpc, path: my_app_root, env: :dev},
{:telemetry_metrics, "~> 0.7"},
{:telemetry_metrics_prometheus, "~> 1.1"},
{:req, "~> 0.3"}
],
config_path: Path.join(my_app_root, "config/config.exs"),
lockfile: Path.join(my_app_root, "mix.lock")
)
Telemetry Events
We know from the documentation GRPC.Telemetry
that some server-side events and some client-side events are published. We can use those events to build a Prometheus metrics export through Telemetry.Metrics
and TelemetryMetricsPrometheus
that’s retrocompatible to the deprecated :grpc_prometheus
library.
First, let’s create a mock server and client that we can use.
GRPC Server and Client
# This whole code block is taken from test/support/proto/helloword.pb.ex
# Normally, this would be autogenerated from a valid protobuf file,
# but we're using this hardcoded version so that this Livebook is self-contained.
defmodule Helloworld.HelloRequest do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field(:name, 1, type: :string)
field(:duration, 2, proto3_optional: true, type: :int32)
end
defmodule Helloworld.HelloReply do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field(:message, 1, type: :string)
end
defmodule Helloworld.HeaderRequest do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
end
defmodule Helloworld.HeaderReply do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field(:authorization, 1, type: :string)
end
defmodule Helloworld.Greeter.Service do
@moduledoc false
use GRPC.Service, name: "helloworld.Greeter", protoc_gen_elixir_version: "0.11.0"
rpc(:SayHello, Helloworld.HelloRequest, Helloworld.HelloReply)
rpc(:CheckHeaders, Helloworld.HeaderRequest, Helloworld.HeaderReply)
end
defmodule Helloworld.Greeter.Stub do
@moduledoc false
use GRPC.Stub, service: Helloworld.Greeter.Service
end
defmodule HelloServer do
use GRPC.Server, service: Helloworld.Greeter.Service
def say_hello(%{name: "raise", duration: duration}, _stream) do
Process.sleep(duration)
raise ArgumentError, "exception raised"
end
def say_hello(%{name: "ok", duration: duration}, _stream) do
Process.sleep(duration)
Helloworld.HelloReply.new(message: "Hello")
end
def say_hello(%{name: "not_found", duration: duration}, _stream) do
Process.sleep(duration)
raise GRPC.RPCError, status: GRPC.Status.not_found()
end
def check_headers(_req, stream) do
token = GRPC.Stream.get_headers(stream)["authorization"]
Helloworld.HeaderReply.new(authorization: token)
end
end
defmodule MetricsSupervisor do
use Supervisor
import Telemetry.Metrics
def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
def init(_arg) do
# We attach to these events to combine them into a single published metric
# This can actually be done directly in Prometheus through Recording Rules,
# but this shows how to build a drop-in replacement for :grpc_prometheus
:telemetry.attach_many(
"handler-#{__MODULE__}",
[
GRPC.Telemetry.server_rpc_prefix() ++ [:stop],
GRPC.Telemetry.server_rpc_prefix() ++ [:exception],
GRPC.Telemetry.client_rpc_prefix() ++ [:stop],
GRPC.Telemetry.client_rpc_prefix() ++ [:exception]
],
fn [:grpc, server_or_client, :rpc, event_kind],
%{duration: duration},
%{stream: stream} = metadata,
_opts ->
code =
case {event_kind, metadata[:result], metadata[:reason]} do
{:stop, {:ok, _}, _} -> GRPC.Status.code_name(0)
{:stop, {:ok, _, _}, _} -> GRPC.Status.code_name(0)
{:stop, {:error, %GRPC.RPCError{status: status}}, _} -> GRPC.Status.code_name(status)
{:exception, _, %GRPC.RPCError{status: status}} -> GRPC.Status.code_name(status)
_ -> GRPC.Status.code_name(GRPC.Status.unknown())
end
metadata = %{
grpc_service: stream.service_name,
grpc_method: stream.method_name,
grpc_type: stream.grpc_type,
grpc_code: code
}
if is_message(stream) do
:telemetry.execute(
[:custom_grpc, :"#{server_or_client}_rpc", :sent],
%{duration: duration},
metadata
)
end
:telemetry.execute(
[:custom_grpc, :"#{server_or_client}_rpc", :handled],
%{duration: duration},
metadata
)
end,
nil
)
# This can also be achieved through some clever use of tags+tag_values,
# without having to attach and publish a new event. However, that would
# end up leaking an extraneous tag to Prometheus.
# This is cleaner in that sense.
:telemetry.attach_many(
"handler-#{__MODULE__}-start",
[
GRPC.Telemetry.server_rpc_prefix() ++ [:start],
GRPC.Telemetry.client_rpc_prefix() ++ [:start]
],
fn _event, _, %{stream: stream}, _opts ->
if is_message(stream) do
:telemetry.execute([:custom_grpc, :server_rpc, :message_received], %{count: 1})
end
end,
nil
)
children = [
{TelemetryMetricsPrometheus, [metrics: metrics(), port: 9568]}
]
Supervisor.init(children, strategy: :one_for_one)
end
defp is_message(stream) do
stream.grpc_type in [:client_stream, :bidirectional_stream]
end
@histogram_buckets_seconds [5.0e-3, 10.0e-3, 25.0e-3, 50.0e-3, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
@tags [:grpc_service, :grpc_method, :grpc_type]
@tags_with_code [:grpc_code | @tags]
defp metrics do
[
# Server Metrics
counter(
"grpc_server_started_total",
event_name: "grpc.server.rpc.start",
measurement: :count,
tags: @tags,
tag_values: &extract_tags_from_stream/1,
description: "Total number of RPCs started on the server"
),
counter(
"grpc_server_msg_received_total",
event_name: "custom_grpc.server_rpc.message_received",
measurement: :count,
tags: @tags,
tag_values: &extract_tags_from_stream/1,
description: "Total number of RPC stream messages received on the server"
),
counter(
"grpc_server_msg_sent_total",
event_name: "custom_grpc.server_rpc.sent",
measurement: :duration,
tags: @tags,
description: "Total number of gRPC stream messages sent by the server."
),
counter(
"grpc_server_handled_total",
event_name: "custom_grpc.server_rpc.handled",
measurement: :duration,
tags: @tags_with_code,
description:
"Total number of RPCs completed on the server, regardless of success or failure."
),
distribution(
"grpc_server_handled_latency_seconds",
event_name: "custom_grpc.server_rpc.handled",
description: "Histogram of response latency of rpcs handled by the server, in seconds.",
measurement: :duration,
tags: @tags_with_code,
unit: {:native, :second},
reporter_options: [
buckets: @histogram_buckets_seconds
]
),
# Client Metrics
counter(
"grpc_client_started_total",
event_name: "grpc.client.rpc.start",
measurement: :count,
tags: @tags,
tag_values: &extract_tags_from_stream/1,
description: "Total number of RPCs started on the client"
),
counter(
"grpc_client_msg_received_total",
event_name: "custom_grpc.client_rpc.message_received",
measurement: :count,
tags: @tags,
tag_values: &extract_tags_from_stream/1,
description: "Total number of RPC stream messages received on the client"
),
counter(
"grpc_client_msg_sent_total",
event_name: "custom_grpc.client_rpc.sent",
measurement: :duration,
tags: @tags,
description: "Total number of gRPC stream messages sent by the client."
),
counter(
"grpc_client_handled_total",
event_name: "custom_grpc.client_rpc.handled",
measurement: :duration,
tags: @tags_with_code,
description:
"Total number of RPCs completed on the client, regardless of success or failure."
),
distribution(
"grpc_client_handled_latency_seconds",
event_name: "custom_grpc.client_rpc.handled",
description: "Histogram of response latency of rpcs handled by the client, in seconds.",
measurement: :duration,
tags: @tags_with_code,
unit: {:native, :second},
reporter_options: [
buckets: @histogram_buckets_seconds
]
)
]
end
defp extract_tags_from_stream(%{
stream: %{
service_name: service_name,
method_name: method_name,
grpc_type: grpc_type
}
}) do
%{
grpc_service: service_name,
grpc_method: method_name,
grpc_type: grpc_type
}
end
defp extract_tags_from_stream(_) do
%{grpc_service: nil, grpc_method: nil, grpc_type: nil}
end
end
MetricsSupervisor.start_link([])
GRPC.Server.start([HelloServer], 1337, [])
{:ok, channel} = GRPC.Stub.connect("localhost:1337")
# the requests take some time internally, so we might not get _exactly_
# the bucket distribution we expect
for duration <- [0, 6, 101, 101, 501, 1001] do
success_request = Helloworld.HelloRequest.new(name: "ok", duration: duration)
exception_request = Helloworld.HelloRequest.new(name: "raise", duration: duration)
not_found_request = Helloworld.HelloRequest.new(name: "not_found", duration: duration)
{:ok, _} = Helloworld.Greeter.Stub.say_hello(channel, success_request)
{:error, %GRPC.RPCError{status: 2}} =
Helloworld.Greeter.Stub.say_hello(channel, exception_request)
{:error, %GRPC.RPCError{status: 5}} =
Helloworld.Greeter.Stub.say_hello(channel, not_found_request)
end
# Checking which metrics our endpoint returns
# TO-DO: replace this by actual server calls
{:ok, %Req.Response{status: 200, body: body}} = Req.request(url: "http://localhost:9568/metrics")
# Print the output in a readable format
IO.puts(body)
nil