Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Language Concepts

manual/language_concepts.livemd

Language Concepts

Mix.install([
  {:skitter, "~> 0.6"},
  {:kino, "~> 0.7"}
])

defmodule LivebookHelpers do
  def render(wf = %Skitter.Workflow{}) do
    wf
    |> Skitter.Dot.render("svg")
    |> then(fn {:ok, image} -> image end)
    |> Kino.Image.new(:svg)
  end
end
Resolving Hex dependencies...
Dependency resolution completed:
New:
  kino 0.7.0
  logger_file_backend 0.0.13
  murmur 1.0.3
  skitter 0.6.0
  table 0.1.2
  telemetry 1.1.0
* Getting skitter (Hex package)
* Getting kino (Hex package)
* Getting table (Hex package)
* Getting logger_file_backend (Hex package)
* Getting murmur (Hex package)
* Getting telemetry (Hex package)
==> logger_file_backend
Compiling 1 file (.ex)
Generated logger_file_backend app
==> table
Compiling 5 files (.ex)
Generated table app
==> kino
Compiling 36 files (.ex)
Generated kino app
===> Analyzing applications...
===> Compiling telemetry
==> murmur
Compiling 1 file (.ex)
warning: use Bitwise is deprecated. import Bitwise instead
  lib/murmur.ex:19: Murmur

Generated murmur app
==> skitter
Compiling 67 files (.ex)
warning: variable "ctx" is unused (if the variable is not meant to be used, prefix it with an underscore)
  lib/runtime/emit.ex:18: Skitter.Runtime.Emit.emit/2

Generated skitter app

17:43:56.632 [info] Skitter v0.6.0 started in local mode

17:43:56.635 [info] Reachable at `snvkciqs-livebook_app@silverlode`
{:module, LivebookHelpers, <<70, 79, 82, 49, 0, 0, 7, ...>>, {:render, 1}}

Introduction

This guide provides a primer on the various language constructs offered by Skitter and how they work together. It is recommended to open this document in Livebook, so you can see the workflow diagrams and play around with the code while going through this guide.

Skitter is a Distributed Stream Processing Engine with support for custom distribution strategies. Skitter enables developers to write stream processing applications, which can be distributed over a cluster by its runtime system. A novel concept introduced by Skitter is the notion of a distribution strategy. These strategies determine how the application is distributed over the cluster. This enables developers to select the most appropriate distribution strategy for the various operations in their application.

Skitter applications are defined through the use of three constructs: workflows, operations and strategies. A distinct DSL is provided for each of these constructs.

Construct DSL Goal
Workflow Skitter.DSL.Workflow.workflow/2 Define a stream processing application consisting of several data processing steps.
Operation Skitter.DSL.Operation.defoperation/2 Define the data processing logic of a single data processing step.
Strategy Skitter.DSL.Strategy.defstrategy/2 Define the distribution logic of a single data processing step.

We will discuss each of these concepts, and their DSLs, in detail in the following sections. The following shorthand can be use to import all of the DSLs offered by Skitter.

use Skitter.DSL
:ok

Running Example & General Overview

Throughout this document, we will use a wordcount application as a running example. This application will receive phrases and count the occurrence of the various words in these phrases. The output of this application is a map with the current count of all the received words.

flowchart LR;
  source([input stream])
  app([wordcount application])
  sink([counts map])

  source-- hello skitter-->app
  app -- hello: 1, skitter: 1--> sink

The wordcount application will be defined as a Skitter workflow which consists of several different data processing operations.

flowchart LR;
  source([input stream])
  sink([counts map])

  subgraph wordcount_application
    direction LR
    stream_source-->split-->count-->show
  end

  source-- hello skitter-->stream_source
  show -- hello: 1, skitter: 1--> sink

At runtime, distribution strategies will take the operations present in the application and distribute them over a cluster.

flowchart LR;
  source([input stream])
  sink([counts map])

  subgraph wordcount_application
    direction LR

    subgraph cluster_node_1
      cn_1_source(stream_source)
      cn_1_split(split)
      cn_1_count(count)

      cn_1_source --> cn_1_split --> cn_1_count
    end

    subgraph cluster_node_2
      cn_2_split(split)
      cn_2_count(count)
      cn_2_sink(show)

      cn_2_split --> cn_2_count --> cn_2_sink
    end

    cn_1_source --> cn_2_split
    cn_1_split --> cn_2_count
    cn_2_split --> cn_1_count
    cn_1_count --> cn_2_sink
  end
  

  source-- hello skitter-->cn_1_source
  cn_2_sink -- hello: 1, skitter: 1--> sink

We generate an infinite stream of word combinations to serve as input to our application.

words = ~w(Mathijs Wolf Joeri Skitter Workflow Operation Strategy)

phrase_stream =
  Stream.repeatedly(fn ->
    # Let's not drain our battery.
    Process.sleep(100)
    "#{Enum.random(words)} #{Enum.random(words)} #{Enum.random(words)}"
  end)
#Function<51.124013645/2 in Stream.repeatedly/1>

This stream contains phrases which contain 3 random words.

phrase_stream |> Stream.take(3) |> Enum.to_list()
["Operation Operation Wolf", "Wolf Operation Strategy", "Joeri Wolf Workflow"]

Workflows

Skitter applications are written as workflows created through the use of the Skitter.DSL.Workflow.workflow/2 DSL. A workflow is a combination of several data processing steps called nodes which are linked together.

graph LR;
  stream_source --> split --> count --> show

Each node of a workflow consists of two parts: an operation and a strategy. The operation defines the data processing logic of the application, while the strategy defines how the operation is distributed over the cluster at runtime.

flowchart TD;
  node((Node))
  oper((Operation))
  strat((Strategy))


  node-- data processing logic -->oper;
  node-- distribution logic -->strat;

A workflow can be defined using the workflow/2 macro, defined in Skitter.DSL.Workflow. This macro is imported when use Skitter.DSL is used, as above.

Before we can create a workflow, we need to define some operations to make the workflow/2 compiler happy. For now, we will create empty operations, which do not containing any actual logic.

defoperation Count, in: [word], out: [word_with_count] do
end

defoperation Show, in: [value] do
end
{:module, Show, <<70, 79, 82, 49, 0, 0, 10, ...>>, nil}

This code defines two operations: Count and Show. The in: and out: syntax is used to specify the inputs accepted by the operations and the outputs they produce.

A workflow is defined by declaring both nodes and links between them. The node operator is used to create a node with an operation and a strategy, the ~> operator is used to link nodes together.

workflow do
  node(Skitter.BIO.StreamSource, with: Skitter.BIS.StreamSource, args: phrase_stream, as: source)
  node(Skitter.BIO.FlatMap, with: Skitter.BIS.ImmutableLocal, args: &amp;String.split/1, as: split)
  node(Count, with: Skitter.BIS.KeyedState, as: count)
  node(Show, with: Sedentary, as: show)

  source._ ~> split._
  split._ ~> count.word
  count.word_with_count ~> show.value
end
|> LivebookHelpers.render()

This code defines a workflow consisting of 4 nodes: source, split, count and show. These nodes are linked together to form a pipeline of data processing steps. Each node defines an operation (the first argument passed to node) and a strategy (the argument prefixed with with:). The as: syntax is used to name a node; the args: syntax is used to pass arguments to a node. For instance, we use it to pass our infinite stream to the source node.

This workflow uses both the operations we defined above and some built-in operators and strategies provided by Skitter. The operators prefixed with Skitter.BIO are operators defined by Skitter, the strategies prefixed with Skitter.BIS are built-in strategies. As a convention, generic operations (such as Skitter.BIO.FlatMap) use _ as a name for their in and out ports.

The LivebookHelpers.render() call is used to visualise the generated workflow. It is defined in the setup section of this livebook.

The workflow definition shown above is quite verbose. workflow/2 offers syntactic sugar to facilitate the creation of workflows. The following workflow definitions are all equivalent to the one shown above.

First, node declarations and links can be chained:

workflow do
  node(Skitter.BIO.StreamSource, with: Skitter.BIS.StreamSource, args: phrase_stream, as: source)
  ~> node(Skitter.BIO.FlatMap, with: Skitter.BIS.ImmutableLocal, args: &amp;String.split/1, as: split)
  ~> node(Count, with: Skitter.BIS.KeyedState, as: count)
  ~> node(Show, with: Sedentary, as: show)
end
|> LivebookHelpers.render()

Second, names can be omitted. In this case, the workflow DSL will generate a node name:

workflow do
  node(Skitter.BIO.StreamSource, with: Skitter.BIS.StreamSource, args: phrase_stream)
  ~> node(Skitter.BIO.FlatMap, with: Skitter.BIS.ImmutableLocal, args: &amp;String.split/1)
  ~> node(Count, with: Skitter.BIS.KeyedState)
  ~> node(Show, with: Sedentary)
end
|> LivebookHelpers.render()

Many operations define a default distribution strategy. When this is the case, the distribution strategy can be omitted from the workflow definition. When this is done, workflow/2 will use the distribution strategy specified by the operation:

workflow do
  node(Skitter.BIO.StreamSource, args: phrase_stream)
  ~> node(Skitter.BIO.FlatMap, args: &amp;String.split/1)
  ~> node(Count, with: Skitter.BIS.KeyedState)
  ~> node(Show, with: Sedentary)
end
|> LivebookHelpers.render()

Finally, syntactic sugar is present for using the built-in operations provided by Skitter:

workflow do
  stream_source(phrase_stream)
  ~> flat_map(&amp;String.split/1)
  ~> node(Count, with: Skitter.BIS.KeyedState)
  ~> node(Show, with: Sedentary)
end
|> LivebookHelpers.render()

The strategy of the operation can still be modified when the shorthand notation is used:

workflow do
  stream_source(phrase_stream, with: Skitter.BIS.StreamSource)
  ~> flat_map(&amp;String.split/1, with: Skitter.BIS.ImmutableLocal)
  ~> node(Count, with: Skitter.BIS.KeyedState)
  ~> node(Show, with: Sedentary)
end
|> LivebookHelpers.render()

To recap, a workflow is defined as the combination of several data processing steps called nodes. Each node consists of an operation and a strategy. The operation defines the data processing logic, while the strategy defines how the operation is distributed. Skitter.DSL.Workflow.workflow/2 enables the creation of Skitter workflows by declaring nodes (with node), which can be linked together through the use of the ~> operator. The complete documentation of workflow/2, ~>/2 and node/2 can be found in Skitter.DSL.Workflow.

Operations

Operations define the data processing logic of a stream processing application. Each operation represents a single data processing step in the data processing pipeline. An operation is defined as a collection of several callbacks along with some meta-information. The meta-information specifies the inputs accepted by the operation, the output it produces, its initial state and its (optional) default distribution strategy. The callbacks contain the code used to process incoming data elements.

A callback is an elixir function which accepts two implicit inputs: a state and a configuration; both of these arguments are provided by a distribution strategy when the callback is called. Furthermore, a callback always returns a t:Skitter.Operation.result/0 struct. This struct contains the return value of the callback, along with the updated state and emitted data, which we discuss later.

Operations are defined through the use of the Skitter.DSL.Operation.defoperation/2 macro. Inside the body of defoperation, various callbacks can be defined through the use of defcb. Callbacks are defined similar to regular elixir functions, with a few differences:

  • Callbacks accept an implicit state and config argument, which represent the state and configuration of the callback.
  • Inside the body of a callback, state() can be used to access the state, config() can be used to access the configuration data, <~ can be used to update the state and ~> can be used to emit data.
  • Callbacks return a Result struct which contains the return value of the callback’s body, the updated state and any data that was emitted through the use of ~>.
use Skitter.DSL

defoperation Count, in: [word], out: [word_with_count], strategy: Skitter.BIS.KeyedState do
  initial_state 0

  defcb key(word), do: word

  defcb react(word) do
    state <~ state() + 1
    {word, state()} ~> word_with_count
  end
end
{:module, Count, <<70, 79, 82, 49, 0, 0, 14, ...>>, {:react, 3}}

The code above (re)defines the Count operation used by the workflow in our running example. The first line specifies that the Count operation accepts a single input, called word and produces a single output, called word_with_count. It also specifies that the Skitter.BIS.KeyedState strategy is used to distribute this operation when no strategy is passed inside the workflow.

The KeyedState strategy partitions the state of an operation over the cluster and requires that an operation implements (at least) two callbacks: key and react. The first is called to determine the key for a given data element, while the second is called to update the state associated with that data element. Since we wish to maintain a count for each word, we use the incoming word as the key. In turn, the strategy will use this key to ensure that each incoming word has its own state. To update this state, the KeyedState strategy will call the react callback with the appropriate state. Inside the body of react, we increment the count of the current word (its state) by one, after which we emit the current count on the word_with_count out port.

To summarise, the following happens when a word has to be processed by the Count operation. We leave out the logic that occurs on the strategy side.

sequenceDiagram
    participant S as KeyedState (strategy)
    participant O as Count (operation)

    activate S
    S->>O: key("hello")
    deactivate S
    activate O
    O->>S: result: "hello"
    deactivate O
    activate S
    S->>O: react(5, "hello")
    deactivate S
    activate O
    O->>S: state: 6, emit: word_with_count: {"hello", 6}
    deactivate O
    activate S
    deactivate S

Callbacks can be called programatically, let’s see what happens when we call the count callback with the word “hello”.

Skitter.Operation.call(Count, :key, ["hello"])
%Skitter.Operation.Callback.Result{state: 0, emit: [], result: "hello"}

A callback always returns a result containing both it’s current state, the emitted data and it’s result (the last value returned in the body of defcb). It is up to the strategy to decide which of the returned values to use.

Note that the call returned a state of 0. When a callback is called without an explicit state, it’s initial state is used instead. This initial state is operation specific. It is defined at line 4 of the definition of Count.

When we call an operation, we can explicity specify a value for its state and configuration.

Skitter.Operation.call(Count, :key, 5, nil, ["hello"])
%Skitter.Operation.Callback.Result{state: 5, emit: [], result: "hello"}

Since key does not modify its state, it is returned unchanged. The defcb implementation ensures a callback always returns a t:Skitter.Operation.result/0 struct.

Modified state and emitted data are added to the result struct.

Skitter.Operation.call(Count, :react, 5, nil, ["hello"])
%Skitter.Operation.Callback.Result{state: 6, emit: [word_with_count: [{"hello", 6}]], result: nil}

As we can see, the callback incremented its state by one and emitted some data on the word_with_count out port.

To summarize, an operation defines the data processing logic present in the application. It is defined by writing a set of callbacks. The set of callbacks that an operation should specify are determined by its strategy.

Besides its callbacks, an operation defines a set of in port and out ports, which define the inputs the operation receives and the values it emits, respectively. Callbacks are called with a state, provided by the strategy. An operation may (optionally) define an initial state, which can be used by the strategy. Finally, an operation can define a default distribution strategy, which is used when no strategy is specified in the workflow. All details can be found in the documentation of Skitter.DSL.Operation.defoperation/2.

We define a single additional operation, which we will use to visualise the output of our workflow inside the livebook.

defoperation Show, strategy: Sedentary, in: value do
  initial_state %{}

  defcb conf(frame), do: frame

  defcb react({word, count}) do
    state() <~ Map.put(state(), word, count)
    Kino.Frame.render(config(), state())
  end
end
{:module, Show, <<70, 79, 82, 49, 0, 0, 14, ...>>, {:react, 3}}

This operation receives the current word count for a given word, and updates its internal state, which contains the current count of all words. It uses the config() operator to access the configuration data passed by the distribution strategy. This configuration data is created by the conf callback. This callback accepts a so-called “frame”, which we can use to display the current wordcount.

Strategies

Strategies define how an operation is distributed over a cluster. A strategy is defined in terms of several hooks, each of which is called by the Skitter runtime system in response to certain events.

Hook Signature When
deploy () -> any() Called for every node in the workflow when a workflow is deployed.
deliver data, port -> () Called when a predecessor in the workflow emits data to be sent.
process message, state, tag -> state Called when a worker receives a message.

As an example, we implement a strategy for the Count operation shown above.

defstrategy KeyedState do
  defhook deploy do
    Remote.on_all_worker_cores(fn -> local_worker(Map.new(), :aggregator) end)
    |> Enum.flat_map(fn {_node, workers} -> workers end)
    |> List.to_tuple()
  end

  defhook deliver(data, _port) do
    aggregators = deployment()
    key = call(:key, [data]).result
    idx = rem(Murmur.hash_x86_32(key), tuple_size(aggregators))
    worker = elem(aggregators, idx)
    send(worker, data)
  end

  defhook process(data, state_map, :aggregator) do
    key = call(:key, [data]).result
    state = Map.get(state_map, key, initial_state())
    res = call(:react, state, nil, [data])
    emit(res.emit)
    Map.put(state_map, key, res.state)
  end
end
{:module, KeyedState, <<70, 79, 82, 49, 0, 0, 20, ...>>, {:process, 4}}

This strategy partitions the key of an operation over several workers. When a data record needs to be processed, the key callback of the operation is called, after which the resulting value is used to select a worker to process the data element. Elements with the same key get sent to the same worker. When a worker receives a data element, it calls the react callback of the operation with the correct state. The updated state returned by react is stored inside the worker.

The deploy hook is responsible for deploying the operation over the cluster. In this case, this is done by creating several workers, one for every (CPU) core of every node in the cluster. The pids of the spawned workers are converted to a tuple for easy indexing and returned as the result of deploy. This result is stored inside the deployment, which is an immutable data store that can be accessed by all other hooks.

The deliver hook is called when a previous node in the workflow emits data. It is responsible for sending the emitted data to a worker to be processed. This is done by obtaining the key for the data element (using the key callback of the operation), and using the resulting key to select a worker. This is done by hashing the key and using the hash to select a worker from the tuple of worker pids (obtained through the deployment() operator). Once a worker is selected, the send operator is used to send the data to the selected worker.

The process hook is called when a worker receives a message. It is called with the received message and the current state of the worker. A worker in the KeyedState strategy may store the state of several keys. Therefore, the state of the worker is a map, which associates a key with its state. Inside the process hook, the state of the appropriate key is extracted from the map, after which react is called with this state and the received data. The data emitted by react is emitted by the strategy (potentially calling the deliver hook of the strategies of downstream nodes), and the updated state is stored inside the worker’s state. The result of the process hook is stored as the new state of the worker.

We define a single additional strategy, called Sedentary which we use to distribute the Show operation defined above. This strategy executes all oeprations in a single worker.

defstrategy Sedentary do
  defhook deploy do
    config = call_if_exists(:conf, [args()]).result
    ref = remote_worker(initial_state(), :worker)
    {config, ref}
  end

  defhook deliver(data, _port) do
    {_, worker} = deployment()
    send(worker, data)
  end

  defhook process(msg, state, _) do
    {conf, _} = deployment()
    res = call(:react, state, conf, [msg])
    emit(res.emit)
    res.state
  end
end
{:module, Sedentary, <<70, 79, 82, 49, 0, 0, 16, ...>>, {:process, 4}}

Inside the deploy hook, a single worker is created. Additionally, the conf callback is called to enable the operation to set up its configuration data. In the case of the Show operation, this configuration will contain the frame that will be used to show the current word counts. A reference to the worker and the created configuration are returned to be stored inside the deployment data.

Inside the deliver hook, the reference to the worker is retrieved from the deployment data after which the data to be sent is sent to the worker. Upon receiving the data, the worker will call the react callback of the operation to process the data. The new state and emitted data returned by the operation are stored inside the worker and emitted, respectively.

Executable Example

frame = Kino.Frame.new()
ref =
  workflow do
    stream_source(phrase_stream)
    ~> flat_map(&amp;String.split/1)
    ~> node(Count, with: KeyedState)
    ~> node(Show, with: Sedentary, args: frame)
  end
  |> Skitter.Runtime.deploy()
#Reference<0.2711774296.1163657217.41538>
Skitter.Runtime.stop(ref)
:ok