FastAvro Samples
Mix.install(
[
{:fastavro, path: "/media/ramon/f9abf31d-9e05-4c6f-9dec-9bb9c0612a74/development/fastavro"},
{:vega_lite, "~> 0.1.6"},
{:kino, "~> 0.8.1"},
{:benchee, "~> 1.1"},
{:benchee_html, "~> 1.0"}
],
config: [nx: [default_backend: EXLA.Backend]]
)
Load Data Files
We start by reading some files from filesystem to get the data needed for the following test runs.
We will need the avro schema for our test message and the avro raw data for a sample message.
{:ok, schema} = File.read!("bench/lte_202210.avsc") |> FastAvro.read_schema()
avro_data = File.read!("bench/lte_202210.avro")
Decode Message
First thing we wiil want to try is to decode the avro_data into an intermediate representation by using the schema as template for data interpretation.
{:ok, msg} = FastAvro.decode_avro_datum(avro_data, schema)
We can convert from this intermediate representation into an Elixir map.
FastAvro.to_map(msg)
Get field from message
We can read just one field from the intermediate representation and get its value.
FastAvro.get_avro_value(msg, "Dest_TAC")
Is also possible to read the field from the raw representation without generate the intermediate representation.
FastAvro.get_raw_value(avro_data, schema, "Dest_TAC")
It’s even possible to read multiple values from the given avro_data using the schema.
FastAvro.get_raw_values(avro_data, schema, [
"Dest_TAC",
"Event_Start",
"Event_Stop"
])
We can combine this with encode function to extract data and reencode message.
{:ok, new_schema} =
FastAvro.read_schema("""
{
"type": "record",
"name": "small",
"fields": [
{"name": "tac", "type": "int"},
{"name": "from", "type": "string"},
{"name": "to", "type": "string"}
]
}
""")
avro_data
|> FastAvro.get_raw_values(schema, [
"Dest_TAC",
"Event_Start",
"Event_Stop"
])
|> elem(1)
|> Map.new(fn
{"Dest_TAC", "TAC: " <> v} -> {"tac", String.to_integer(v)}
{"Event_Start", <<_::binary-size(9), v::binary>>} -> {"from", v}
{"Event_Stop", <<_::binary-size(9), v::binary>>} -> {"to", v}
end)
|> FastAvro.encode_avro_datum(new_schema)
|> elem(1)
|> FastAvro.decode_avro_datum(new_schema)
|> elem(1)
|> FastAvro.to_map()
with {:ok, map} <-
FastAvro.get_raw_values(avro_data, schema, ["Dest_TAC", "Event_Start", "Event_Stop"]),
new_map <-
Map.new(map, fn
{"Dest_TAC", "TAC: " <> v} -> {"tac", String.to_integer(v)}
{"Event_Start", <<_::binary-size(9), v::binary>>} -> {"from", v}
{"Event_Stop", <<_::binary-size(9), v::binary>>} -> {"to", v}
end),
{:ok, encoded} <- FastAvro.encode_avro_datum(new_map, new_schema),
{:ok, decoded} <- FastAvro.decode_avro_datum(encoded, new_schema) do
FastAvro.to_map(decoded)
end
defmodule Tests do
def recode(avro_data, schema, new_schema) do
avro_data
|> FastAvro.get_raw_values(schema, ["Dest_TAC", "Event_Start", "Event_Stop"])
|> elem(1)
|> Map.new(fn
{"Dest_TAC", "TAC: " <> v} -> {"tac", String.to_integer(v)}
{"Event_Start", <<_::binary-size(9), v::binary>>} -> {"from", v}
{"Event_Stop", <<_::binary-size(9), v::binary>>} -> {"to", v}
end)
|> FastAvro.encode_avro_datum(new_schema)
end
end
Benchee.run(
%{
"sample" => fn -> Tests.recode(avro_data, schema, new_schema) end
},
warmup: 2,
time: 20,
parallel: 1
)
nil
Incompatible schema and return errors
schema = """
{
"namespace": "test.avro",
"type": "record",
"name": "test_record",
"fields": [
{
"name": "string_field",
"type": "string"
},
{
"name": "double_field",
"type": "double"
},
{
"name": "second_string",
"type": "string"
}
]
}
"""
schema_wrong = """
{
"namespace": "test.avro",
"type": "record",
"name": "test_record",
"fields": [
{
"name": "int_field",
"type": "int"
},
{
"name": "string_field",
"type": "string"
}
]
}
"""
{:ok, avro_schema} = FastAvro.read_schema(schema)
msg = %{
"string_field" => "prueba",
"double_field" => 22.5,
"second_string" => "otra"
}
{:ok, avro_msg} = FastAvro.encode_avro_datum(msg, avro_schema)
{:ok, wrong_avro_schema} = FastAvro.read_schema(schema_wrong)
FastAvro.decode_avro_datum(avro_msg, wrong_avro_schema)
FastAvro.decode_avro_datum(avro_msg, avro_schema)
IO.inspect(FastAvro.get_raw_value(avro_msg, avro_schema, "string_field"))
IO.inspect(FastAvro.get_raw_value(avro_msg, avro_schema, "wrong_field"))
IO.inspect(FastAvro.get_raw_value(avro_msg, wrong_avro_schema, "string_field"))
nil