Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Kafka Topics

slides/2_kafka_topic.livemd

Kafka Topics

Mix.install(
  [
    {:kino, "~> 0.7.0"},
    {:kafka_ex, "~> 0.11"},
    {:kino_vega_lite, "~> 0.1.4"}
  ],
  config: [
    kafka_ex: [
      kafka_version: "kayrock",
      brokers: [
        {"localhost", 19092},
        {"localhost", 29092},
        {"localhost", 39092}
      ]
    ]
  ]
)

Section

A topic is a named container for similar events:

A topic is a log of events that can only be added to, not modified or deleted (append only).

> ๐Ÿง  This makes it easy to reason about the replication of events because they are immutable.

The log can only be accessed by seeking to a specific offset, rather than being indexed for faster searching (scan of an event, not indexed).

The log is durable, meaning that events will not be lost even if the system shuts down. Retention of events can be configured to a specific time frame, such as a few seconds, a few years, or indefinitely.

> ๐Ÿ““ In contrast, queues are ephemeral, meaning they only exist temporarily and events are not retained after they have been processed.

Logs are file stored on disk:

IMMUTABILITY + DURABILITY

alias KafkaEx.Protocol.CreateTopics.TopicRequest
alias KafkaEx.Protocol.CreateTopics.ConfigEntry
alias KafkaEx.Protocol.CreateTopics.Response, as: CreateTopicsResponse

KafkaEx.start_link_worker(:create_topic, server_impl: KafkaEx.New.Client)

KafkaEx.create_topics(
  [
    %TopicRequest{topic: "foo", num_partitions: 1, replication_factor: 1}
  ],
  worker_name: :create_topic
)

KafkaEx.metadata(topic: "foo")
defmodule MyConsumer do
  use KafkaEx.GenConsumer

  alias KafkaEx.Protocol.Fetch.Message

  require Logger

  # note - messages are delivered in batches
  # consumer config:
  # - fetch.min.bytes
  #     The minimum amount of data the server should return for a fetch request. 
  #     If insufficient data is available the request will wait for that much data to accumulate before answering the request.
  # - fetch.max.bytes
  #     The maximum number of bytes we will return for a fetch request. 
  #     Must be at least 1024
  # - fetch.max.wait.ms
  #     The maximum amount of time the server will block before answering the fetch request 
  #     if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes 
  def handle_message_set(message_set, state) do
    for %Message{value: message} <- message_set do
      Logger.debug(fn -> "MyConsumer - consumed message: " <> inspect(message) end)
    end

    {:sync_commit, state}
  end
end

# Start a consumer
KafkaEx.ConsumerGroup.start_link(
  MyConsumer,
  "my_group",
  ["foo"],
  # Many functions support an api_version parameter 
  # The version 3 stores offsets in Kafka instead of Zookeeper.
  api_versions: %{offset_fetch: 3, offset_commit: 3}
)
alias KafkaEx.Protocol.Produce.{Request, Message}

Enum.each(1..10, fn i ->
  produce_request = %Request{
    topic: "foo",
    messages: [%Message{key: "key1", value: "value #{i}"}]
  }

  KafkaEx.produce(produce_request)
end)

Topic Retention

You can specify the log cleaning policy by configuring the log.cleanup.policy property. Two options are available:

  • delete - which removes log segments that fall outside the retention criteria
    • by size - log.retention.bytesThe maximum size of the log before deleting it
    • by time - log.retention.hours|minutes|ms if set to -1, no time limit is applied
  • compact - which uses log compaction to keep the latest version of each record and discard older duplicates.

Log Compaction

Log compaction is a feature that allows for more specific retention of records, rather than just retaining them based on a set amount of time. It works by removing older records that have been updated with newer ones that have the same primary key. This ensures that the log always includes the most recent state for each key.

> ๐Ÿ““ Compaction also allows for deletes. A message with a key and a null payload will be treated as a delete from the log. Such a record is sometimes referred to as a tombstone. This delete marker will cause any prior message with that key to be removed.

alias KafkaEx.Protocol.CreateTopics.TopicRequest
alias KafkaEx.Protocol.CreateTopics.ConfigEntry
alias KafkaEx.Protocol.CreateTopics.Response, as: CreateTopicsResponse

KafkaEx.start_link_worker(:create_topic, server_impl: KafkaEx.New.Client)

KafkaEx.create_topics(
  [
    %TopicRequest{
      topic: "foo_compact",
      num_partitions: 1,
      replication_factor: 1,
      config_entries: [
        %{config_name: "cleanup.policy", config_value: "compact"},
        # The minimum time a message will remain uncompacted in the log. 
        # Only applicable for logs that are being compacted.
        %{config_name: "min.compaction.lag.ms", config_value: "0"},
        # The maximum time a message will remain ineligible for compaction in the log. 
        # Only applicable for logs that are being compacted.
        %{config_name: "max.compaction.lag.ms", config_value: "100"}
      ]
    }
  ],
  worker_name: :create_topic
)
alias KafkaEx.Protocol.Produce.{Request, Message}

Enum.each(1..10, fn i ->
  IO.inspect("Write message #{i}")

  produce_request = %Request{
    topic: "foo_compact",
    messages: [%Message{key: "key1", value: "value #{i}"}]
  }

  KafkaEx.produce(produce_request)
end)

KafkaEx.fetch("foo_compact", 0, offset: 0)

> ๐Ÿ““ Ordering of messages is always maintained and the offset for a message never changes.

<- Previous | Next ->