Language Concepts
Mix.install([
{:skitter, "~> 0.6"},
{:kino, "~> 0.7"}
])
defmodule LivebookHelpers do
def render(wf = %Skitter.Workflow{}) do
wf
|> Skitter.Dot.render("svg")
|> then(fn {:ok, image} -> image end)
|> Kino.Image.new(:svg)
end
end
Resolving Hex dependencies...
Dependency resolution completed:
New:
kino 0.7.0
logger_file_backend 0.0.13
murmur 1.0.3
skitter 0.6.0
table 0.1.2
telemetry 1.1.0
* Getting skitter (Hex package)
* Getting kino (Hex package)
* Getting table (Hex package)
* Getting logger_file_backend (Hex package)
* Getting murmur (Hex package)
* Getting telemetry (Hex package)
==> logger_file_backend
Compiling 1 file (.ex)
Generated logger_file_backend app
==> table
Compiling 5 files (.ex)
Generated table app
==> kino
Compiling 36 files (.ex)
Generated kino app
===> Analyzing applications...
===> Compiling telemetry
==> murmur
Compiling 1 file (.ex)
warning: use Bitwise is deprecated. import Bitwise instead
lib/murmur.ex:19: Murmur
Generated murmur app
==> skitter
Compiling 67 files (.ex)
warning: variable "ctx" is unused (if the variable is not meant to be used, prefix it with an underscore)
lib/runtime/emit.ex:18: Skitter.Runtime.Emit.emit/2
Generated skitter app
17:43:56.632 [info] Skitter v0.6.0 started in local mode
17:43:56.635 [info] Reachable at `snvkciqs-livebook_app@silverlode`
{:module, LivebookHelpers, <<70, 79, 82, 49, 0, 0, 7, ...>>, {:render, 1}}
Introduction
This guide provides a primer on the various language constructs offered by Skitter and how they work together. It is recommended to open this document in Livebook, so you can see the workflow diagrams and play around with the code while going through this guide.
Skitter is a Distributed Stream Processing Engine with support for custom distribution strategies. Skitter enables developers to write stream processing applications, which can be distributed over a cluster by its runtime system. A novel concept introduced by Skitter is the notion of a distribution strategy. These strategies determine how the application is distributed over the cluster. This enables developers to select the most appropriate distribution strategy for the various operations in their application.
Skitter applications are defined through the use of three constructs: workflows, operations and strategies. A distinct DSL is provided for each of these constructs.
Construct | DSL | Goal |
---|---|---|
Workflow |
Skitter.DSL.Workflow.workflow/2 |
Define a stream processing application consisting of several data processing steps. |
Operation |
Skitter.DSL.Operation.defoperation/2 |
Define the data processing logic of a single data processing step. |
Strategy |
Skitter.DSL.Strategy.defstrategy/2 |
Define the distribution logic of a single data processing step. |
We will discuss each of these concepts, and their DSLs, in detail in the following sections. The following shorthand can be use to import all of the DSLs offered by Skitter.
use Skitter.DSL
:ok
Running Example & General Overview
Throughout this document, we will use a wordcount application as a running example. This application will receive phrases and count the occurrence of the various words in these phrases. The output of this application is a map with the current count of all the received words.
flowchart LR;
source([input stream])
app([wordcount application])
sink([counts map])
source-- hello skitter-->app
app -- hello: 1, skitter: 1--> sink
The wordcount application will be defined as a Skitter workflow which consists of several different data processing operations.
flowchart LR;
source([input stream])
sink([counts map])
subgraph wordcount_application
direction LR
stream_source-->split-->count-->show
end
source-- hello skitter-->stream_source
show -- hello: 1, skitter: 1--> sink
At runtime, distribution strategies will take the operations present in the application and distribute them over a cluster.
flowchart LR;
source([input stream])
sink([counts map])
subgraph wordcount_application
direction LR
subgraph cluster_node_1
cn_1_source(stream_source)
cn_1_split(split)
cn_1_count(count)
cn_1_source --> cn_1_split --> cn_1_count
end
subgraph cluster_node_2
cn_2_split(split)
cn_2_count(count)
cn_2_sink(show)
cn_2_split --> cn_2_count --> cn_2_sink
end
cn_1_source --> cn_2_split
cn_1_split --> cn_2_count
cn_2_split --> cn_1_count
cn_1_count --> cn_2_sink
end
source-- hello skitter-->cn_1_source
cn_2_sink -- hello: 1, skitter: 1--> sink
We generate an infinite stream of word combinations to serve as input to our application.
words = ~w(Mathijs Wolf Joeri Skitter Workflow Operation Strategy)
phrase_stream =
Stream.repeatedly(fn ->
# Let's not drain our battery.
Process.sleep(100)
"#{Enum.random(words)} #{Enum.random(words)} #{Enum.random(words)}"
end)
#Function<51.124013645/2 in Stream.repeatedly/1>
This stream contains phrases which contain 3 random words.
phrase_stream |> Stream.take(3) |> Enum.to_list()
["Operation Operation Wolf", "Wolf Operation Strategy", "Joeri Wolf Workflow"]
Workflows
Skitter applications are written as workflows created through the use of the Skitter.DSL.Workflow.workflow/2
DSL. A workflow is a combination of several data processing steps called nodes which are linked together.
graph LR;
stream_source --> split --> count --> show
Each node of a workflow consists of two parts: an operation and a strategy. The operation defines the data processing logic of the application, while the strategy defines how the operation is distributed over the cluster at runtime.
flowchart TD;
node((Node))
oper((Operation))
strat((Strategy))
node-- data processing logic -->oper;
node-- distribution logic -->strat;
A workflow can be defined using the workflow/2
macro, defined in Skitter.DSL.Workflow
. This macro is imported when use Skitter.DSL
is used, as above.
Before we can create a workflow, we need to define some operations to make the workflow/2
compiler happy. For now, we will create empty operations, which do not containing any actual logic.
defoperation Count, in: [word], out: [word_with_count] do
end
defoperation Show, in: [value] do
end
{:module, Show, <<70, 79, 82, 49, 0, 0, 10, ...>>, nil}
This code defines two operations: Count
and Show
. The in:
and out:
syntax is used to specify the inputs accepted by the operations and the outputs they produce.
A workflow is defined by declaring both nodes and links between them. The node
operator is used to create a node with an operation and a strategy, the ~>
operator is used to link nodes together.
workflow do
node(Skitter.BIO.StreamSource, with: Skitter.BIS.StreamSource, args: phrase_stream, as: source)
node(Skitter.BIO.FlatMap, with: Skitter.BIS.ImmutableLocal, args: &String.split/1, as: split)
node(Count, with: Skitter.BIS.KeyedState, as: count)
node(Show, with: Sedentary, as: show)
source._ ~> split._
split._ ~> count.word
count.word_with_count ~> show.value
end
|> LivebookHelpers.render()
This code defines a workflow consisting of 4 nodes: source
, split
, count
and show
. These nodes are linked together to form a pipeline of data processing steps. Each node defines an operation (the first argument passed to node
) and a strategy (the argument prefixed with with:
). The as:
syntax is used to name a node; the args:
syntax is used to pass arguments to a node. For instance, we use it to pass our infinite stream to the source
node.
This workflow uses both the operations we defined above and some built-in operators and strategies provided by Skitter. The operators prefixed with Skitter.BIO
are operators defined by Skitter, the strategies prefixed with Skitter.BIS
are built-in strategies. As a convention, generic operations (such as Skitter.BIO.FlatMap
) use _
as a name for their in and out ports.
The LivebookHelpers.render()
call is used to visualise the generated workflow. It is defined in the setup section of this livebook.
The workflow definition shown above is quite verbose. workflow/2
offers syntactic sugar to facilitate the creation of workflows. The following workflow definitions are all equivalent to the one shown above.
First, node declarations and links can be chained:
workflow do
node(Skitter.BIO.StreamSource, with: Skitter.BIS.StreamSource, args: phrase_stream, as: source)
~> node(Skitter.BIO.FlatMap, with: Skitter.BIS.ImmutableLocal, args: &String.split/1, as: split)
~> node(Count, with: Skitter.BIS.KeyedState, as: count)
~> node(Show, with: Sedentary, as: show)
end
|> LivebookHelpers.render()
Second, names can be omitted. In this case, the workflow DSL will generate a node name:
workflow do
node(Skitter.BIO.StreamSource, with: Skitter.BIS.StreamSource, args: phrase_stream)
~> node(Skitter.BIO.FlatMap, with: Skitter.BIS.ImmutableLocal, args: &String.split/1)
~> node(Count, with: Skitter.BIS.KeyedState)
~> node(Show, with: Sedentary)
end
|> LivebookHelpers.render()
Many operations define a default distribution strategy. When this is the case, the distribution strategy can be omitted from the workflow definition. When this is done, workflow/2
will use the distribution strategy specified by the operation:
workflow do
node(Skitter.BIO.StreamSource, args: phrase_stream)
~> node(Skitter.BIO.FlatMap, args: &String.split/1)
~> node(Count, with: Skitter.BIS.KeyedState)
~> node(Show, with: Sedentary)
end
|> LivebookHelpers.render()
Finally, syntactic sugar is present for using the built-in operations provided by Skitter:
workflow do
stream_source(phrase_stream)
~> flat_map(&String.split/1)
~> node(Count, with: Skitter.BIS.KeyedState)
~> node(Show, with: Sedentary)
end
|> LivebookHelpers.render()
The strategy of the operation can still be modified when the shorthand notation is used:
workflow do
stream_source(phrase_stream, with: Skitter.BIS.StreamSource)
~> flat_map(&String.split/1, with: Skitter.BIS.ImmutableLocal)
~> node(Count, with: Skitter.BIS.KeyedState)
~> node(Show, with: Sedentary)
end
|> LivebookHelpers.render()
To recap, a workflow is defined as the combination of several data processing steps called nodes. Each node consists of an operation and a strategy. The operation defines the data processing logic, while the strategy defines how the operation is distributed. Skitter.DSL.Workflow.workflow/2
enables the creation of Skitter workflows by declaring nodes (with node
), which can be linked together through the use of the ~>
operator. The complete documentation of workflow/2
, ~>/2
and node/2
can be found in Skitter.DSL.Workflow
.
Operations
Operations define the data processing logic of a stream processing application. Each operation represents a single data processing step in the data processing pipeline. An operation is defined as a collection of several callbacks along with some meta-information. The meta-information specifies the inputs accepted by the operation, the output it produces, its initial state and its (optional) default distribution strategy. The callbacks contain the code used to process incoming data elements.
A callback is an elixir function which accepts two implicit inputs: a state and a configuration; both of these arguments are provided by a distribution strategy when the callback is called. Furthermore, a callback always returns a t:Skitter.Operation.result/0
struct. This struct contains the return value of the callback, along with the updated state and emitted data, which we discuss later.
Operations are defined through the use of the Skitter.DSL.Operation.defoperation/2
macro. Inside the body of defoperation
, various callbacks can be defined through the use of defcb
. Callbacks are defined similar to regular elixir functions, with a few differences:
-
Callbacks accept an implicit
state
andconfig
argument, which represent the state and configuration of the callback. -
Inside the body of a callback,
state()
can be used to access the state,config()
can be used to access the configuration data,<~
can be used to update the state and~>
can be used to emit data. -
Callbacks return a
Result
struct which contains the return value of the callback’s body, the updated state and any data that was emitted through the use of~>
.
use Skitter.DSL
defoperation Count, in: [word], out: [word_with_count], strategy: Skitter.BIS.KeyedState do
initial_state 0
defcb key(word), do: word
defcb react(word) do
state <~ state() + 1
{word, state()} ~> word_with_count
end
end
{:module, Count, <<70, 79, 82, 49, 0, 0, 14, ...>>, {:react, 3}}
The code above (re)defines the Count
operation used by the workflow in our running example. The first line specifies that the Count
operation accepts a single input, called word
and produces a single output, called word_with_count
. It also specifies that the Skitter.BIS.KeyedState
strategy is used to distribute this operation when no strategy is passed inside the workflow.
The KeyedState
strategy partitions the state of an operation over the cluster and requires that an operation implements (at least) two callbacks: key
and react
. The first is called to determine the key for a given data element, while the second is called to update the state associated with that data element. Since we wish to maintain a count for each word
, we use the incoming word as the key. In turn, the strategy will use this key to ensure that each incoming word has its own state. To update this state, the KeyedState
strategy will call the react
callback with the appropriate state. Inside the body of react
, we increment the count of the current word (its state) by one, after which we emit the current count on the word_with_count
out port.
To summarise, the following happens when a word has to be processed by the Count
operation. We leave out the logic that occurs on the strategy side.
sequenceDiagram
participant S as KeyedState (strategy)
participant O as Count (operation)
activate S
S->>O: key("hello")
deactivate S
activate O
O->>S: result: "hello"
deactivate O
activate S
S->>O: react(5, "hello")
deactivate S
activate O
O->>S: state: 6, emit: word_with_count: {"hello", 6}
deactivate O
activate S
deactivate S
Callbacks can be called programatically, let’s see what happens when we call the count
callback with the word “hello”.
Skitter.Operation.call(Count, :key, ["hello"])
%Skitter.Operation.Callback.Result{state: 0, emit: [], result: "hello"}
A callback always returns a result containing both it’s current state, the emitted data and it’s result (the last value returned in the body of defcb
). It is up to the strategy to decide which of the returned values to use.
Note that the call returned a state
of 0
. When a callback is called without an explicit state, it’s initial state is used instead. This initial state is operation specific. It is defined at line 4 of the definition of Count
.
When we call an operation, we can explicity specify a value for its state and configuration.
Skitter.Operation.call(Count, :key, 5, nil, ["hello"])
%Skitter.Operation.Callback.Result{state: 5, emit: [], result: "hello"}
Since key
does not modify its state, it is returned unchanged. The defcb
implementation ensures a callback always returns a t:Skitter.Operation.result/0
struct.
Modified state and emitted data are added to the result struct.
Skitter.Operation.call(Count, :react, 5, nil, ["hello"])
%Skitter.Operation.Callback.Result{state: 6, emit: [word_with_count: [{"hello", 6}]], result: nil}
As we can see, the callback incremented its state by one and emitted some data on the word_with_count
out port.
To summarize, an operation defines the data processing logic present in the application. It is defined by writing a set of callbacks. The set of callbacks that an operation should specify are determined by its strategy.
Besides its callbacks, an operation defines a set of in port and out ports, which define the inputs the operation receives and the values it emits, respectively. Callbacks are called with a state, provided by the strategy. An operation may (optionally) define an initial state, which can be used by the strategy. Finally, an operation can define a default distribution strategy, which is used when no strategy is specified in the workflow. All details can be found in the documentation of Skitter.DSL.Operation.defoperation/2
.
We define a single additional operation, which we will use to visualise the output of our workflow inside the livebook.
defoperation Show, strategy: Sedentary, in: value do
initial_state %{}
defcb conf(frame), do: frame
defcb react({word, count}) do
state() <~ Map.put(state(), word, count)
Kino.Frame.render(config(), state())
end
end
{:module, Show, <<70, 79, 82, 49, 0, 0, 14, ...>>, {:react, 3}}
This operation receives the current word count for a given word, and updates its internal state, which contains the current count of all words. It uses the config()
operator to access the configuration data passed by the distribution strategy. This configuration data is created by the conf
callback. This callback accepts a so-called “frame”, which we can use to display the current wordcount.
Strategies
Strategies define how an operation is distributed over a cluster. A strategy is defined in terms of several hooks, each of which is called by the Skitter runtime system in response to certain events.
Hook | Signature | When |
---|---|---|
deploy |
() -> any() |
Called for every node in the workflow when a workflow is deployed. |
deliver |
data, port -> () |
Called when a predecessor in the workflow emits data to be sent. |
process |
message, state, tag -> state |
Called when a worker receives a message. |
As an example, we implement a strategy for the Count
operation shown above.
defstrategy KeyedState do
defhook deploy do
Remote.on_all_worker_cores(fn -> local_worker(Map.new(), :aggregator) end)
|> Enum.flat_map(fn {_node, workers} -> workers end)
|> List.to_tuple()
end
defhook deliver(data, _port) do
aggregators = deployment()
key = call(:key, [data]).result
idx = rem(Murmur.hash_x86_32(key), tuple_size(aggregators))
worker = elem(aggregators, idx)
send(worker, data)
end
defhook process(data, state_map, :aggregator) do
key = call(:key, [data]).result
state = Map.get(state_map, key, initial_state())
res = call(:react, state, nil, [data])
emit(res.emit)
Map.put(state_map, key, res.state)
end
end
{:module, KeyedState, <<70, 79, 82, 49, 0, 0, 20, ...>>, {:process, 4}}
This strategy partitions the key of an operation over several workers. When a data record needs to be processed, the key
callback of the operation is called, after which the resulting value is used to select a worker to process the data element. Elements with the same key get sent to the same worker. When a worker receives a data element, it calls the react
callback of the operation with the correct state. The updated state returned by react
is stored inside the worker.
The deploy
hook is responsible for deploying the operation over the cluster. In this case, this is done by creating several workers, one for every (CPU) core of every node in the cluster. The pids of the spawned workers are converted to a tuple for easy indexing and returned as the result of deploy
. This result is stored inside the deployment, which is an immutable data store that can be accessed by all other hooks.
The deliver
hook is called when a previous node in the workflow emits data. It is responsible for sending the emitted data to a worker to be processed. This is done by obtaining the key for the data element (using the key
callback of the operation), and using the resulting key to select a worker. This is done by hashing the key and using the hash to select a worker from the tuple of worker pids (obtained through the deployment()
operator). Once a worker is selected, the send
operator is used to send the data to the selected worker.
The process
hook is called when a worker receives a message. It is called with the received message and the current state of the worker. A worker in the KeyedState
strategy may store the state of several keys. Therefore, the state of the worker is a map, which associates a key with its state. Inside the process
hook, the state of the appropriate key is extracted from the map, after which react is called with this state and the received data. The data emitted by react
is emitted by the strategy (potentially calling the deliver
hook of the strategies of downstream nodes), and the updated state is stored inside the worker’s state. The result of the process
hook is stored as the new state of the worker.
We define a single additional strategy, called Sedentary
which we use to distribute the Show
operation defined above. This strategy executes all oeprations in a single worker.
defstrategy Sedentary do
defhook deploy do
config = call_if_exists(:conf, [args()]).result
ref = remote_worker(initial_state(), :worker)
{config, ref}
end
defhook deliver(data, _port) do
{_, worker} = deployment()
send(worker, data)
end
defhook process(msg, state, _) do
{conf, _} = deployment()
res = call(:react, state, conf, [msg])
emit(res.emit)
res.state
end
end
{:module, Sedentary, <<70, 79, 82, 49, 0, 0, 16, ...>>, {:process, 4}}
Inside the deploy
hook, a single worker is created. Additionally, the conf
callback is called to enable the operation to set up its configuration data. In the case of the Show
operation, this configuration will contain the frame that will be used to show the current word counts. A reference to the worker and the created configuration are returned to be stored inside the deployment data.
Inside the deliver
hook, the reference to the worker is retrieved from the deployment data after which the data to be sent is sent to the worker. Upon receiving the data, the worker will call the react
callback of the operation to process the data. The new state and emitted data returned by the operation are stored inside the worker and emitted, respectively.
Executable Example
frame = Kino.Frame.new()
ref =
workflow do
stream_source(phrase_stream)
~> flat_map(&String.split/1)
~> node(Count, with: KeyedState)
~> node(Show, with: Sedentary, args: frame)
end
|> Skitter.Runtime.deploy()
#Reference<0.2711774296.1163657217.41538>
Skitter.Runtime.stop(ref)
:ok