Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Quick run-through

guides/quick-run-through.livemd

Quick run-through

Mix.install([
  {:serde_arrow, git: "https://github.com/Benjamin-Philip/serde_arrow.git"}
])

Introduction

Serializing Erlang into Arrow typically follows the following sequence of events:

  1. Creating a Schema Message with the schema of the data
  2. Creating Arrow Arrays for each column in the schema.
  3. Combining the columns of each batch or table into a RecordBatch Message
  4. Combining the Schema and all the RecordBatches into an IPC Stream or an IPC File

Technically, to use the Arrow Columnar Format itself you only need to do step 2. The other steps are in order to use Arrow IPC which is an abstraction over the Arrow Columnar Format. IPC specifies a format to organize multiple Arrow Arrays, and is the standard choice when you do not have your own custom format detailing so.

For the purposes of illustration, let use assume that we have to export the annual marks of a school to Arrow.

Creating a Schema

The marksheet of each class has 4 columns:

  1. id, which is the Roll no.
  2. name, which is the first name of the student
  3. age, which is the age of the student
  4. marks, which is a list of all the marks of the student

Ideally the marks should’ve been split into columns for each subject, but we are using a list to show how to manage nested types. We can pretend that the school administration is stupid or something.

First, we must declare a field for each column:

IdField = serde_arrow_ipc_field:from_erlang({int, #{bit_width => 8, is_signed => true}}, "id"),
NameField = serde_arrow_ipc_field:from_erlang(large_binary, "name"),
AgeField = serde_arrow_ipc_field:from_erlang({int, #{bit_width => 8, is_signed => false}}, "age"),
AnnualMarksField = 
    serde_arrow_ipc_field:from_erlang(
        {fixed_size_list, #{list_size => 3}}, "annual_marks", [
            serde_arrow_ipc_field:from_erlang(
                {int, #{bit_width => 8, is_signed => false}}
            )
        ]
    ).

As you may have noticed, the parameters for declaring a field is the field type, followed by the field name. Each type may have extra metadata associated with it, such as the int. In the case of a nested datatype, we must also include a list of the fields that are nested.

We then must take those fields and combine them into a schema:

Schema = serde_arrow_ipc_schema:from_erlang([IdField, NameField, AgeField, AnnualMarksField]).

And that schema into a message:

SchemaMsg = serde_arrow_ipc_message:from_erlang(Schema).

Creating Arrow Arrays

Now that we’ve got a schema, it is time to create the data for that schema. Let’s assume that there are 3 classes, Class1, Class2, and Class3. We must create the data for each class separately:

Here’s Class1:

Class1Id = serde_arrow_array:from_erlang(fixed_primitive, [0, 1, 2, undefined], {s, 8}),
Class1Name = serde_arrow_array:from_erlang(
        variable_binary, [<<"alice">>, <<"bob">>, <<"charlie">>, undefined], {bin, undefined}
    ),
Class1Age = serde_arrow_array:from_erlang(fixed_primitive, [10, 20, 30, undefined], {s, 8}),
Class1AnnualMarks = serde_arrow_array:from_erlang(
        fixed_list, [[100, 97, 98], [100, 99, 96], [100, 98, 95], undefined], {s, 8}
    ),
Class1Columns = [Class1Id, Class1Name, Class1Age, Class1AnnualMarks].

Class2:

Class2Id = serde_arrow_array:from_erlang(fixed_primitive, [3, 4, 5], {s, 8}),
Class2Name = serde_arrow_array:from_erlang(
        variable_binary, [<<"anne"/utf8-little>>, <<"benoît"/utf8-little>>, <<"clément"/utf8-little>>], {bin, undefined}
    ),
Class2Age = serde_arrow_array:from_erlang(fixed_primitive, [10, 20, 30], {s, 8}),
Class2AnnualMarks = serde_arrow_array:from_erlang(
        fixed_list, [[90, 87, 75], [85, 99, 96], [75, 88, 60]], {s, 8}
    ),
Class2Columns = [Class2Id, Class2Name, Class2Age, Class2AnnualMarks].

Class3:

Class3Id = serde_arrow_array:from_erlang(fixed_primitive, [6, 7, 8], {s, 8}),
Class3Name = serde_arrow_array:from_erlang(
        variable_binary, [<<"anjali">>, <<"biju">>, <<"chandra">>], {bin, undefined}
    ),
Class3Age = serde_arrow_array:from_erlang(fixed_primitive, [10, 20, 30], {s, 8}),
Class3AnnualMarks = serde_arrow_array:from_erlang(
        fixed_list, [[100, 100, 98], [100, 100, 100], [99, 98, 97]], {s, 8}
    ),
Class3Columns = [Class3Id, Class3Name, Class3Age, Class3AnnualMarks].

Combining Data into a RecordBatch

RecordBatches are used to model different data with the same schema. In this case marks from different classes. Another example would marks of the same class throughout different years. The columns that make up such a table can be combined like so:

Class1RecordBatch = serde_arrow_ipc_record_batch:from_erlang(Class1Columns),
Class2RecordBatch = serde_arrow_ipc_record_batch:from_erlang(Class2Columns),
Class3RecordBatch = serde_arrow_ipc_record_batch:from_erlang(Class3Columns).

Each of these RecordBatches need a body:

Class1Msg = serde_arrow_ipc_message:body_from_erlang(Class1Columns),
Class2Msg = serde_arrow_ipc_message:body_from_erlang(Class2Columns),
Class3Msg = serde_arrow_ipc_message:body_from_erlang(Class3Columns).

Which can then be combined into a message:

Class1Msg = serde_arrow_ipc_message:from_erlang(Class1RecordBatch, Class1Body),
Class2Msg = serde_arrow_ipc_message:from_erlang(Class2RecordBatch, Class3Body),
Class3Msg = serde_arrow_ipc_message:from_erlang(Class3RecordBatch, Class3Body).

If we wanted to, we could serialize these messages into an Encapsulated message:

Class1EMF = serde_arrow_ipc_message:to_ipc(Class1Msg),
Class2EMF = serde_arrow_ipc_message:to_ipc(Class2Msg),
Class3EMF = serde_arrow_ipc_message:to_ipc(Class3Msg).

Typically, you wouldn’t directly use an encapsulated message but a stream or a file instead.

Combining Messages into Streams and Files

Now that we have a schema and some record batches, we can combine them into a stream and a file. The difference between a stream and a file is thin: a file is a superset of the stream which supports random access and is meant to be saved to a file.

We can create a stream like this:

serde_arrow_ipc_message:to_stream([SchemaMsg, Class1Msg, Class2Msg, Class3Msg]).

Or a file like this:

ErlFile = serde_arrow_ipc_file:from_erlang(SchemaMsg, [Class1Msg, Class2Msg, Class3Msg]),
File = serde_arrow_ipc_file:to_ipc(ErlFile).

We could then save that binary into a file:

file:write_file("/tmp/annual_marks.feather", File).