Kafka is a distributed system
Mix.install(
[
{:kino, "~> 0.7.0"},
{:kafka_ex, "~> 0.11"},
{:kino_vega_lite, "~> 0.1.4"}
],
config: [
kafka_ex: [
kafka_version: "kayrock",
brokers: [
{"localhost", 19092},
{"localhost", 29092},
{"localhost", 39092}
]
]
]
)
Section
If a topic is a single node, its performance will be limited. To improve performance, we can partition the topic log into multiple logs, each of which can live on a separate node in the Kafka cluster.
A Kafka cluster is a collection of brokers that reach consensus by communicating with each other. The cluster can operate in two modes:
- using ZooKeeper for coordination
- using quorum controller (kraft) feature (KIP-500) which replace the need of zookeeper. You can learn more on the official documentation in https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum”
> Reads and writes in a Kafka cluster are distributed across its nodes.
In this livebook, we will focus on kraft (Kafka’s mode without ZooKeeper) because it requires fewer nodes to operate a cluster in the local env.
With kraft, nodes can have one of the following roles:
- Broker: responsible for maintaining and replicating the data for the topic-partitions it stores.
- Controller: responsible for maintaining the cluster metadata and managing the broker leadership.
- Both: a node can have both roles if it is configured as such.
KRaft Controller
In KRaft mode, specific Kafka servers are selected to be controllers A Kafka admin will typically select 3 or 5 servers for this role, depending on factors like cost and the number of concurrent failures your system should withstand without availability impact. A majority of the controllers must be alive in order to maintain availability. With 3 controllers, the cluster can tolerate 1 controller failure; with 5 controllers, the cluster can tolerate 2 controller failures.
> 📓 Kafka server’s process.role should be set to either broker or controller but not both. Combined mode can be used in development enviroment but it should be avoided in critical deployment evironments.
Topic Partitions
The messages are distributed between partitions:
alias KafkaEx.Protocol.CreateTopics.TopicRequest
alias KafkaEx.Protocol.CreateTopics.ConfigEntry
alias KafkaEx.Protocol.CreateTopics.Response, as: CreateTopicsResponse
KafkaEx.start_link_worker(:create_topic, server_impl: KafkaEx.New.Client)
KafkaEx.create_topics(
[
%TopicRequest{topic: "foo_replicated", num_partitions: 3, replication_factor: 3}
],
worker_name: :create_topic
)
defmodule MyConsumer do
use KafkaEx.GenConsumer
alias KafkaEx.Protocol.Fetch.Message
require Logger
# note - messages are delivered in batches
# consumer config:
# - fetch.min.bytes
# The minimum amount of data the server should return for a fetch request.
# If insufficient data is available the request will wait for that much data to accumulate before answering the request.
# - fetch.max.bytes
# The maximum number of bytes we will return for a fetch request.
# Must be at least 1024
# - fetch.max.wait.ms
# The maximum amount of time the server will block before answering the fetch request
# if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes
def handle_message_set(message_set, state) do
for %Message{value: message} <- message_set do
Logger.debug(fn -> "MyConsumer - consumed message: " <> inspect(message) end)
end
{:sync_commit, state}
end
end
# Start 3 consumer
Enum.each(1..3, fn i ->
IO.inspect("Start consumer: #{i}")
KafkaEx.ConsumerGroup.start_link(
MyConsumer,
"my_group_replicated",
["foo_replicated"],
# Many functions support an api_version parameter
# The version 3 stores offsets in Kafka instead of Zookeeper.
api_versions: %{offset_fetch: 3, offset_commit: 3}
)
end)
alias KafkaEx.Protocol.Produce.{Request, Message}
Enum.each(1..10, fn i ->
produce_request =
%Request{
topic: "foo_replicated",
messages: [%Message{key: "key#{i}", value: "value #{i}"}]
}
|> IO.inspect(label: :request)
KafkaEx.produce(produce_request)
end)