Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

FastAvro Samples

fastavro_samples.livemd

FastAvro Samples

Mix.install(
  [
    {:fastavro, path: "/media/ramon/f9abf31d-9e05-4c6f-9dec-9bb9c0612a74/development/fastavro"},
    {:vega_lite, "~> 0.1.6"},
    {:kino, "~> 0.8.1"},
    {:benchee, "~> 1.1"},
    {:benchee_html, "~> 1.0"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Load Data Files

We start by reading some files from filesystem to get the data needed for the following test runs.

We will need the avro schema for our test message and the avro raw data for a sample message.

{:ok, schema} = File.read!("bench/lte_202210.avsc") |> FastAvro.read_schema()
avro_data = File.read!("bench/lte_202210.avro")

Decode Message

First thing we wiil want to try is to decode the avro_data into an intermediate representation by using the schema as template for data interpretation.

{:ok, msg} = FastAvro.decode_avro_datum(avro_data, schema)

We can convert from this intermediate representation into an Elixir map.

FastAvro.to_map(msg)

Get field from message

We can read just one field from the intermediate representation and get its value.

FastAvro.get_avro_value(msg, "Dest_TAC")

Is also possible to read the field from the raw representation without generate the intermediate representation.

FastAvro.get_raw_value(avro_data, schema, "Dest_TAC")

It’s even possible to read multiple values from the given avro_data using the schema.

FastAvro.get_raw_values(avro_data, schema, [
  "Dest_TAC",
  "Event_Start",
  "Event_Stop"
])

We can combine this with encode function to extract data and reencode message.

{:ok, new_schema} =
  FastAvro.read_schema("""
  {
    "type": "record",
    "name": "small",
    "fields": [
      {"name": "tac", "type": "int"},
      {"name": "from", "type": "string"},
      {"name": "to", "type": "string"}
    ]
  }
  """)
avro_data
|> FastAvro.get_raw_values(schema, [
  "Dest_TAC",
  "Event_Start",
  "Event_Stop"
])
|> elem(1)
|> Map.new(fn
  {"Dest_TAC", "TAC: " <> v} -> {"tac", String.to_integer(v)}
  {"Event_Start", <<_::binary-size(9), v::binary>>} -> {"from", v}
  {"Event_Stop", <<_::binary-size(9), v::binary>>} -> {"to", v}
end)
|> FastAvro.encode_avro_datum(new_schema)
|> elem(1)
|> FastAvro.decode_avro_datum(new_schema)
|> elem(1)
|> FastAvro.to_map()
with {:ok, map} <-
       FastAvro.get_raw_values(avro_data, schema, ["Dest_TAC", "Event_Start", "Event_Stop"]),
     new_map <-
       Map.new(map, fn
         {"Dest_TAC", "TAC: " <> v} -> {"tac", String.to_integer(v)}
         {"Event_Start", <<_::binary-size(9), v::binary>>} -> {"from", v}
         {"Event_Stop", <<_::binary-size(9), v::binary>>} -> {"to", v}
       end),
     {:ok, encoded} <- FastAvro.encode_avro_datum(new_map, new_schema),
     {:ok, decoded} <- FastAvro.decode_avro_datum(encoded, new_schema) do
  FastAvro.to_map(decoded)
end
defmodule Tests do
  def recode(avro_data, schema, new_schema) do
    avro_data
    |> FastAvro.get_raw_values(schema, ["Dest_TAC", "Event_Start", "Event_Stop"])
    |> elem(1)
    |> Map.new(fn
      {"Dest_TAC", "TAC: " <> v} -> {"tac", String.to_integer(v)}
      {"Event_Start", <<_::binary-size(9), v::binary>>} -> {"from", v}
      {"Event_Stop", <<_::binary-size(9), v::binary>>} -> {"to", v}
    end)
    |> FastAvro.encode_avro_datum(new_schema)
  end
end
Benchee.run(
  %{
    "sample" => fn -> Tests.recode(avro_data, schema, new_schema) end
  },
  warmup: 2,
  time: 20,
  parallel: 1
)

nil

Incompatible schema and return errors

schema = """
{
    "namespace": "test.avro",
    "type": "record",
    "name": "test_record",
    "fields": [
        
        {
            "name": "string_field",
            "type": "string"
        },
        {
            "name": "double_field",
            "type": "double"
        },
        {
            "name": "second_string",
            "type": "string"
        }
    ]
}
"""
schema_wrong = """
{
    "namespace": "test.avro",
    "type": "record",
    "name": "test_record",
    "fields": [
        {
            "name": "int_field",
            "type": "int"
        },
        {
            "name": "string_field",
            "type": "string"
        }
    ]
}
"""
{:ok, avro_schema} = FastAvro.read_schema(schema)

msg = %{
  "string_field" => "prueba",
  "double_field" => 22.5,
  "second_string" => "otra"
}

{:ok, avro_msg} = FastAvro.encode_avro_datum(msg, avro_schema)
{:ok, wrong_avro_schema} = FastAvro.read_schema(schema_wrong)

FastAvro.decode_avro_datum(avro_msg, wrong_avro_schema)
FastAvro.decode_avro_datum(avro_msg, avro_schema)
IO.inspect(FastAvro.get_raw_value(avro_msg, avro_schema, "string_field"))
IO.inspect(FastAvro.get_raw_value(avro_msg, avro_schema, "wrong_field"))
IO.inspect(FastAvro.get_raw_value(avro_msg, wrong_avro_schema, "string_field"))
nil