Walkthrough
Mix.install([
{:tigerbeetlex, "~> 0.16.42"}
])
Introduction
This walkthrough follows the same format the official TigerBeetle clients use. It uses the blocking API (TigerBeetlex.Connection
) for brevity’s sake but everything applies identically to the message based API (modulo initialization and response reception).
The walkthrough can also be executed in Livebook.
Setup
In your mix.exs
add TigerBeetlex as a dependency
{:tigerbeetlex, "~> 0.16.42"}
Creating a Client
A client is created with a cluster ID and replica addresses for all replicas in the cluster. The cluster ID and replica addresses are both chosen by the system that starts the TigerBeetle cluster.
Clients are thread-safe and a single instance should be shared between multiple concurrent tasks. In the case of TigerBeetlex.Connection
, the client is stored in the state of the underlying processes, so it’s already shared across callers.
Multiple clients are useful when connecting to more than one TigerBeetle cluster.
In this example the cluster ID is 0
and there is one replica. The address is read from the
TB_ADDRESS
environment variable and defaults to port 3000
.
alias TigerBeetlex.Connection
address = System.get_env("TB_ADDRESS", "3000")
{:ok, _pid} = Connection.start_link(name: :tb, cluster_id: <<0::128>>, addresses: [address])
The following are valid addresses:
-
3000
(interpreted as127.0.0.1:3000
) -
127.0.0.1:3000
(interpreted as127.0.0.1:3000
) -
127.0.0.1
(interpreted as127.0.0.1:3001
,3001
is the default port)
Creating Accounts
See details for account fields in the Accounts reference.
alias TigerBeetlex.Account
alias TigerBeetlex.Connection
alias TigerBeetlex.ID
account =
%Account{
# TigerBeetle time-based ID.
id: ID.generate(),
ledger: 1,
code: 718
}
{:ok, _account_errors} = Connection.create_accounts(:tb, [account])
# Error handling omitted.
See details for the recommended ID scheme in time-based identifiers.
Account Flags
The account flags
value is a struct with binary fields. See details for these flags in the Accounts reference.
To toggle behavior for an account, set the relative field to true
:
-
AccountFlags.linked
-
AccountFlags.debits_must_not_exceed_credits
-
AccountFlags.credits_must_not_exceed_credits
-
AccountFlags.history
For example, to link two accounts where the first account additionally has the debits_must_not_exceed_credits
constraint:
alias TigerBeetlex.AccountFlags
account0 =
%Account{
id: ID.from_int(100),
ledger: 1,
code: 1,
flags: %AccountFlags{linked: true, debits_must_not_exceed_credits: true}
}
account1 =
%Account{
id: ID.from_int(101),
ledger: 1,
code: 1,
flags: %AccountFlags{history: true}
}
{:ok, _account_errors} = Connection.create_accounts(:tb, [account0, account1])
# Error handling omitted.
Response and Errors
The response is an empty list if all accounts were created successfully. If the response is non-empty, each struct in the response list contains error information for an account that failed. The error struct contains an error result as atom and the index of the account in the request batch.
See all error conditions in the create_accounts reference.
alias TigerBeetlex.CreateAccountsResult
account0 =
%Account{
id: ID.from_int(102),
ledger: 1,
code: 1
}
account1 =
%Account{
id: ID.from_int(103),
ledger: 1,
code: 1
}
account2 =
%Account{
id: ID.from_int(104),
ledger: 1,
code: 1
}
{:ok, account_errors} = Connection.create_accounts(:tb, [account0, account1, account2])
Enum.each(account_errors, fn
%CreateAccountsResult{result: :exists} = error ->
IO.puts("Batch account at index #{error.index} already exists")
error ->
IO.puts("Batch account at index #{error.index} failed to create: #{error.result}")
end)
To handle errors you can compare the result atom contained in the struct with the ones present in the create_accounts
reference above.
Account Lookup
Account lookup is batched, like account creation. Pass in all IDs to fetch. The account for each matched ID is returned.
If no account matches an ID, no struct is returned for that account. So the order of accounts in the response is not necessarily the same as the order of IDs in the request. You can refer to the ID field in the response to distinguish accounts.
{:ok, accounts} = Connection.lookup_accounts(:tb, [ID.from_int(100), ID.from_int(101)])
Create Transfers
This creates a journal entry between two accounts.
See details for transfer fields in the Transfers reference.
alias TigerBeetlex.Transfer
transfers = [
%Transfer{
# TigerBeetle time-based ID.
id: ID.generate(),
debit_account_id: ID.from_int(102),
credit_account_id: ID.from_int(103),
amount: 10,
ledger: 1,
code: 720
}
]
{:ok, _transfer_errors} = Connection.create_transfers(:tb, transfers)
# Error handling omitted.
See details for the recommended ID scheme in time-based identifiers.
Response and Errors
The response is an empty list if all transfers were created successfully. If the response is non-empty, each struct in the response list contains error information for a transfer that failed. The error struct contains an error result as atom and the index of the transfer in the request batch.
See all error conditions in the create_transfers reference.
alias TigerBeetlex.CreateTransfersResult
transfers = [
%Transfer{
id: ID.from_int(1),
debit_account_id: ID.from_int(101),
credit_account_id: ID.from_int(102),
amount: 10,
ledger: 1,
code: 1
},
%Transfer{
id: ID.from_int(2),
debit_account_id: ID.from_int(101),
credit_account_id: ID.from_int(102),
amount: 10,
ledger: 1,
code: 1
},
%Transfer{
id: ID.from_int(3),
debit_account_id: ID.from_int(101),
credit_account_id: ID.from_int(102),
amount: 10,
ledger: 1,
code: 1
}
]
{:ok, transfer_errors} = Connection.create_transfers(:tb, transfers)
Enum.each(transfer_errors, fn
%CreateTransfersResult{result: :exists} = error ->
IO.puts("Batch transfer at index #{error.index} already exists")
error ->
IO.puts("Batch transfer at index #{error.index} failed to create: #{error.result}")
end)
To handle errors you can compare the result atom contained in the struct with the ones present in the create_transfers
reference above.
Batching
TigerBeetle performance is maximized when you batch API requests. A client instance shared across multiple processes can automatically batch concurrent requests, but the application must still send as many events as possible in a single call. For example, if you insert 1 million transfers sequentially, one at a time, the insert rate will be a fraction of the potential, because the client will wait for a reply between each one.
# List of transfer to create.
batch = []
Enum.each(batch, fn transfer ->
{:ok, _transfer_errors} = Connection.create_transfers(:tb, [transfer])
# Error handling omitted
end)
Instead, always batch as much as you can. The maximum batch size is set in the TigerBeetle server. The default is 8189.
# List of transfer to create.
batch = []
# FIXME
batch_size = 8189
batch
|> Enum.chunk_every(batch_size)
|> Enum.each(fn transfers ->
{:ok, _transfer_errors} = Connection.create_transfers(:tb, transfers)
# Error handling omitted
end)
Queues and Workers
If you are making requests to TigerBeetle from workers pulling jobs from a queue, you can batch requests to TigerBeetle by having the worker act on multiple jobs from the queue at once rather than one at a time. i.e. pulling multiple jobs from the queue rather than just one.
Transfer Flags
The transfer flags
value is a struct with boolean fields. See details for these flags in the Accounts reference.
To toggle behavior for an account, set the relative field to true
:
-
AccountFlags.linked
-
AccountFlags.pending
-
AccountFlags.post_pending_transfer
-
AccountFlags.void_pending_transfer
For example, to link transfer0
and transfer1
:
alias TigerBeetlex.TransferFlags
transfer0 =
%Transfer{
# TigerBeetle time-based ID.
id: ID.from_int(4),
debit_account_id: ID.from_int(102),
credit_account_id: ID.from_int(103),
amount: 10,
ledger: 1,
code: 720,
flags: %TransferFlags{linked: true}
}
transfer1 =
%Transfer{
# TigerBeetle time-based ID.
id: ID.from_int(5),
debit_account_id: ID.from_int(102),
credit_account_id: ID.from_int(103),
amount: 10,
ledger: 1,
code: 720
}
{:ok, _transfer_errors} = Connection.create_transfers(:tb, [transfer0, transfer1])
# Error handling omitted
Two-Phase Transfers
Two-phase transfers are supported natively by toggling the appropriate flag. TigerBeetle will then adjust the credits_pending
and debits_pending
fields of the appropriate accounts. A corresponding post pending transfer then needs to be sent to post or void the transfer.
Post a Pending Transfer
With flags
set to post_pending_transfer
, TigerBeetle will post the transfer. TigerBeetle will atomically roll back the changes to debits_pending
and credits_pending
of the appropriate accounts and apply them to the debits_posted
and credits_posted
balances.
transfer0 =
%Transfer{
# TigerBeetle time-based ID.
id: ID.from_int(6),
debit_account_id: ID.from_int(102),
credit_account_id: ID.from_int(103),
amount: 10,
ledger: 1,
code: 720,
flags: %TransferFlags{pending: true}
}
{:ok, _transfer_errors} = Connection.create_transfers(:tb, [transfer0])
# Error handling omitted
transfer1 =
%Transfer{
# TigerBeetle time-based ID.
id: ID.from_int(7),
debit_account_id: ID.from_int(102),
credit_account_id: ID.from_int(103),
# Post the entire pending amount.
amount: Transfer.amount_max(),
pending_id: ID.from_int(6),
ledger: 1,
code: 720,
flags: %TransferFlags{post_pending_transfer: true}
}
{:ok, _transfer_errors} = Connection.create_transfers(:tb, [transfer1])
# Error handling omitted
Void a Pending Transfer
In contrast, with flags set to void_pending_transfer
, TigerBeetle will void the transfer. TigerBeetle will roll back the changes to debits_pending
and credits_pending
of the appropriate accounts and not apply them to the debits_posted
and credits_posted
balances.
transfer0 =
%Transfer{
# TigerBeetle time-based ID.
id: ID.from_int(8),
debit_account_id: ID.from_int(102),
credit_account_id: ID.from_int(103),
amount: 10,
ledger: 1,
code: 720,
flags: %TransferFlags{pending: true}
}
{:ok, _transfer_errors} = Connection.create_transfers(:tb, [transfer0])
# Error handling omitted
transfer1 =
%Transfer{
# TigerBeetle time-based ID.
id: ID.from_int(9),
debit_account_id: ID.from_int(102),
credit_account_id: ID.from_int(103),
amount: 10,
pending_id: ID.from_int(8),
ledger: 1,
code: 720,
flags: %TransferFlags{void_pending_transfer: true}
}
{:ok, _transfer_errors} = Connection.create_transfers(:tb, [transfer1])
# Error handling omitted
Transfer Lookup
NOTE: While transfer lookup exists, it is not a flexible query API. The TigerBeetle team is developing query APIs and there will be new methods for querying transfers in the future.
Transfer lookup is batched, like transfer creation. Pass in all IDs to fetch, and matched transfers are returned.
If no transfer matches an id, no struct is returned for that transfer. So the order of transfers in the response is not necessarily the same as the order of IDs in the request. You can refer to the ID field in the response to distinguish transfers.
{:ok, transfers} = Connection.lookup_transfers(:tb, [ID.from_int(1), ID.from_int(2)])
Get Account Transfers
NOTE: This is a preview API that is subject to breaking changes once TigerBeetle has a stable querying API.
Fetches the transfers involving a given account, allowing basic filter and pagination capabilities.
The transfers in the response are sorted by timestamp
in chronological or reverse-chronological order.
alias TigerBeetlex.AccountFilter
alias TigerBeetlex.AccountFilterFlags
filter =
%AccountFilter{
account_id: ID.from_int(101),
# No filter by UserData.
user_data_128: <<0::128>>,
user_data_64: 0,
user_data_32: 0,
# No filter by Code.
code: 0,
# No filter by Timestamp.
timestamp_min: 0,
# No filter by Timestamp.
timestamp_max: 0,
# Limit to ten transfers at most.
limit: 10,
flags: %AccountFilterFlags{
# Include transfer from the debit side.
debits: true,
# Include transfer from the credit side.
credits: true,
# Sort by timestamp in reverse-chronological order.
reversed: true
}
}
{:ok, account_transfers} = Connection.get_account_transfers(:tb, filter)
Get Account Balances
NOTE: This is a preview API that is subject to breaking changes once TigerBeetle has a stable querying API.
Fetches the point-in-time balances of a given account, allowing basic filter and pagination capabilities.
Only accounts created with the flag history
set retain historical balances.
The balances in the response are sorted by timestamp
in chronological or reverse-chronological order.
filter =
%AccountFilter{
account_id: ID.from_int(101),
# No filter by UserData.
user_data_128: <<0::128>>,
user_data_64: 0,
user_data_32: 0,
# No filter by Code.
code: 0,
# No filter by Timestamp.
timestamp_min: 0,
# No filter by Timestamp.
timestamp_max: 0,
# Limit to ten balances at most.
limit: 10,
flags: %AccountFilterFlags{
# Include transfer from the debit side.
debits: true,
# Include transfer from the credit side.
credits: true,
# Sort by timestamp in reverse-chronological order.
reversed: true
}
}
{:ok, account_balances} = Connection.get_account_balances(:tb, filter)
Query Accounts
NOTE: This is a preview API that is subject to breaking changes once TigerBeetle has a stable querying API.
Query accounts by the intersection of some fields and by timestamp range.
The accounts in the response are sorted by timestamp
in chronological or reverse-chronological order.
alias TigerBeetlex.QueryFilter
alias TigerBeetlex.QueryFilterFlags
filter =
%QueryFilter{
# Filter by UserData.
user_data_128: <<1000::128>>,
user_data_64: 100,
user_data_32: 10,
# Filter by Code.
code: 1,
# No filter by Ledger.
ledger: 0,
# No filter by Timestamp.
timestamp_min: 0,
# No filter by Timestamp.
timestamp_max: 0,
# Limit to ten transfers at most.
limit: 10,
# Sort by timestamp in reverse-chronological order.
flags: %QueryFilterFlags{reversed: true}
}
{:ok, query_accounts} = Connection.query_accounts(:tb, filter)
Query Transfers
NOTE: This is a preview API that is subject to breaking changes once TigerBeetle has a stable querying API.
Query transfers by the intersection of some fields and by timestamp range.
The transfers in the response are sorted by timestamp
in chronological or reverse-chronological order.
filter =
%QueryFilter{
# Filter by UserData.
user_data_128: <<1000::128>>,
user_data_64: 100,
user_data_32: 10,
# Filter by Code.
code: 1,
# No filter by Ledger.
ledger: 0,
# No filter by Timestamp.
timestamp_min: 0,
# No filter by Timestamp.
timestamp_max: 0,
# Limit to ten transfers at most.
limit: 10,
# Sort by timestamp in reverse-chronological order.
flags: %QueryFilterFlags{reversed: true}
}
{:ok, query_transfers} = Connection.query_transfers(:tb, filter)
Linked Events
When the linked
flag is specified for an account when creating accounts or a transfer when creating transfers, it links that event with the next event in the batch, to create a chain of events, of arbitrary length, which all succeed or fail together. The tail of a chain is denoted by the first event without this flag. The last event in a batch may therefore never have the linked
flag set as this would leave a chain open-ended. Multiple chains or individual events may coexist within a batch to succeed or fail independently.
Events within a chain are executed within order, or are rolled back on error, so that the effect of each event in the chain is visible to the next, and so that the chain is either visible or invisible as a unit to subsequent events after the chain. The event that was the first to break the chain will have a unique error result. Other events in the chain will have their error result set to linked_event_failed
.
batch = [
# An individual transfer (successful):
%Transfer{id: ID.from_int(1)},
# A chain of 4 transfers (the last transfer in the chain closes the chain with linked=false):
# Commit/rollback.
%Transfer{id: ID.from_int(2), flags: %TransferFlags{linked: true}},
# Commit/rollback.
%Transfer{id: ID.from_int(3), flags: %TransferFlags{linked: true}},
# Fail with exists.
%Transfer{id: ID.from_int(2), flags: %TransferFlags{linked: true}},
# Fail without committing.
%Transfer{id: ID.from_int(4)},
# An individual transfer (successful)
# This should not see any effect from the failed chain above
%Transfer{id: ID.from_int(2)},
# A chain of 2 transfers (the first transfer fails the chain):
%Transfer{id: ID.from_int(2), flags: %TransferFlags{linked: true}},
%Transfer{id: ID.from_int(3)},
# A chain of 2 transfers (successful):
%Transfer{id: ID.from_int(3), flags: %TransferFlags{linked: true}},
%Transfer{id: ID.from_int(4)}
]
{:ok, _transfer_errors} = Connection.create_transfers(:tb, batch)
# Error handling omitted.
Imported Events
When the imported
flag is specified for an account when creating accounts or a transfer when creating transfers, it allows importing historical events with a user-defined timestamp.
The entire batch of events must be set with the flag imported
.
It’s recommended to submit the whole batch as a linked
chain of events, ensuring that if any event fails, none of them are committed, preserving the last timestamp unchanged. This approach gives the application a chance to correct failed imported events, re-submitting the batch again with the same user-defined timestamps.
# External source of time.
historical_timestamp = 0
# Events loaded from an external source.
# Loaded from an external source.
historical_accounts = []
# Loaded from an external source.
historical_transfers = []
# First, load and import all accounts with their timestamps from the historical source.
{reversed_accounts, historical_timestamp} =
Enum.reduce(historical_accounts, {[], historical_timestamp}, fn _historical_account,
{acc, last_timestamp} ->
# Set a unique and strictly increasing timestamp.
timestamp = last_timestamp + 1
account = %Account{
timestamp: timestamp,
# To ensure atomicity, the entire batch (except the last event in the chain)
# must be `linked`.
flags: %AccountFlags{imported: true, linked: true}
# Bring over other fields from historical_account
}
{[account | acc], timestamp}
end)
accounts =
reversed_accounts
# We "unset" the linked flag to the "last" event in the chain, which is actually the
# first one since we're accumulating by prepending
|> List.update_at(0, fn account ->
%{account | flags: %AccountFlags{imported: true}}
end)
# We reverse everything to obtain the correct order
|> Enum.reverse()
{:ok, _account_errors} = Connection.create_accounts(:tb, accounts)
# Error handling omitted.
# Then, load and import all transfers with their timestamps from the historical source.
{reversed_transfers, _historical_timestamp} =
Enum.reduce(historical_transfers, {[], historical_timestamp}, fn _historical_account,
{acc, last_timestamp} ->
# Set a unique and strictly increasing timestamp.
timestamp = last_timestamp + 1
transfer = %Transfer{
timestamp: timestamp,
# To ensure atomicity, the entire batch (except the last event in the chain)
# must be `linked`.
flags: %AccountFlags{imported: true, linked: true}
# Bring over other fields from historical_account
}
{[transfer | acc], timestamp}
end)
transfers =
reversed_transfers
# We "unset" the linked flag to the "last" event in the chain, which is actually the
# first one since we're accumulating by prepending
|> List.update_at(0, fn account ->
%{account | flags: %TransferFlags{imported: true}}
end)
# We reverse everything to obtain the correct order
|> Enum.reverse()
{:ok, _transfer_errors} = Connection.create_transfers(:tb, transfers)
# Error handling omitted.
Since it is a linked
chain, in case of any error the entire batch is rolled back and can be retried with the same historical timestamps without regressing the cluster timestamp.