Breaking Down Kafka: A Step-by-Step Guide to Publish & Consume Messages ✉️
Message brokers come in all colors. Each broker has its own edge. There are some brokers which are aimed to be simple and direct such as Amazon SQS (which is literally called Simple Queue Service). Some of them can be used in a simple fashion, but can also be used to implement complex patterns like RabbitMQ which is a topic I talked about in my previous posts. And then there is a broker that was designed to: handle heavy-duty streaming, maximize efficiency, allow you to scale up, serialize messages to bytes, partition messages and much more. That broker is indeed Apache Kafka.
Due to its wide array of features, Kafka can be overwhelming. And sometimes it feels too exhausting to start comprehending its principles and what the broker can provide. So, why not strip down the broker from all of its extra shining features and just start from its core features like publishing and consuming simple JSON serialized string messages.
What we're going to explore isn't just Apache Kafka, rather Confluent Kafka. Let's understand the difference.
Confluent Inc. is a company founded by the original creators of Apache Kafka. They created Confluent Platform, which is a distribution of Apache Kafka that includes additional tools and capabilities to enhance Kafka's usability and functionality in production environments. Some of these features include Schema Registry, KSQL, Confluent Cloud, etc. But most of these features are pretty advanced. So, we'll not be tackling these topics in this post.
So, let's dive in into the world of Kafka and break down this huge structure into very small building blocks.
Let's see what steps we're expecting to follow in this guide:
- Run Kafka
- Create sample model for messages
- Set up a service to allow us to create a topic named "hardcode-topic" and publish and consume messages on this topic.
- Create a controller that uses the service to publish messages
- Initialize the Kafka service and create the topic if it doesn't exist
- Set up a listener that subscribes to the topic
- Start testing our publisher/consumer
Running Kafka 🏃
Let's prepare a docker-compose.yml file to run Kafka along with ZooKeeper. Before we do so, let's talk about ZooKeeper a little bit.Apache ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services in distributed systems. It's robust, since it replicates data across all nodes in the cluster, and any server can service a read request, but write requests go through a leader to ensure consistency.
In the context of Apache Kafka (a distributed streaming platform), ZooKeeper is used for managing and coordinating Kafka brokers. It keeps track of the status of Kafka clusters nodes, and it also keeps track of Kafka topics, partitions etc.
However, starting from Kafka version 2.8.0, there is an option to run Kafka without ZooKeeper, using a self-managed mode called KRaft (Kafka Raft metadata mode). This is a significant step for Kafka, as it reduces operational complexity by getting rid of the dependency on ZooKeeper.
If this seems like too much to grasp. Let's see it in a more practical way once we get our services running.
So, let's create a docker-compose.yml file like I showed you before, and add the following to it.
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 9092:9092
Message Model 📜
namespace Domain.Models{public record Document{public int Id { get; set; }public string Name { get; set; } = string.Empty;}}
Kafka Service Interface 📝
using Domain.Models;namespace Playground.Contracts{public interface IKafkaService{Task PublishMessageAsync(Document document);}}
Kafka Service Implementation 🔨
However, in real life projects, it's better to have a specific service whose purpose is to create or manage topics. But again, I'm choosing this approach for simplicity.
using Confluent.Kafka;using Confluent.Kafka.Admin;using Domain.Models;using Playground.Contracts;using System.Text.Json;namespace Playground.Implementations{public class KafkaService : IKafkaService{private readonly string bootstrapServers = "localhost:9092";private readonly string topicName = "hardcode-topic";public async Task CreateTopicIfNotExistsAsync(){using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(20));if (!metadata.Topics.Exists(t => t.Topic == topicName)){await adminClient.CreateTopicsAsync(new List<TopicSpecification> { new() { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } });}}public async Task PublishMessageAsync(Document document){var config = new ProducerConfig { BootstrapServers = bootstrapServers };using var producer = new ProducerBuilder<Null, string>(config).Build();var message = new Message<Null, string> { Value = JsonSerializer.Serialize(document) };await producer.ProduceAsync(topicName, message);}public void ConsumeMessages(CancellationToken cancellationToken){var conf = new ConsumerConfig{GroupId = "hardcode-consumer-group",BootstrapServers = bootstrapServers,AutoOffsetReset = AutoOffsetReset.Earliest};using var consumer = new ConsumerBuilder<Ignore, string>(conf).Build();consumer.Subscribe(topicName);try{while (!cancellationToken.IsCancellationRequested){var consumeResult = consumer.Consume(cancellationToken);// Deserialize messagevar document = JsonSerializer.Deserialize<Document>(consumeResult.Message.Value)!;Console.WriteLine($"Received message {document.Name}");}}catch (OperationCanceledException){consumer.Close();}}}}
- CreateTopicIfNotExistsAsync: This function is responsible for creating a topic that I hardcoded its name in the class (obviously this should be configurable or a parameter, but this is only an example). If that topic already exists it skips creating it.
I chose the NumPartitions & ReplicationFactor to be 1. This means that the topic will not be replicated and will not benefit from Kafka's fault tolerance features. It also means that you can have at most one consumer in a consumer group consuming messages from this topic. If you want to use more consumers to process messages in parallel, you would need to increase the number of partitions. - PublishMessageAsync: This function is pretty straightforward it just publishes a message of type Document after serializing it to string. I used Null as they key because I'm not using partitioning so no need to add a key to my message.
- ConsumeMessages: This function is basically an infinite loop that keeps checking for new messages on the defined topic using an approach that is similar to long polling.
Notice how we define a group name for our consumer, and this is important for scaling because if we have multiple consumers with the same group name, consuming the message will follow a race condition. Meaning the first one to find the message it will consume it and any other consumer in the same group will not find the message. This function needs to run in a separate thread in the program.cs.
Kafka Controller 🎮
using Microsoft.AspNetCore.Mvc;using Domain.Models;using Playground.Contracts;namespace Playground.Controllers{[ApiController][Route("[controller]")]public class KafkaController(IKafkaService kafkaService) : ControllerBase{[HttpPost]public async Task<IActionResult> PublishDocument(Document document){await kafkaService.PublishMessageAsync(document);return Ok();}}}
Register Kafka Service 📌
After registering the service, we need to fetch it after the app builds and create our topic then start listening to it. And as I mentioned in the beginning, having the consumer and publisher of messages in the same service is not something that would happen in real life cases but I'm only doing this for simplicity.
So, let's go ahead and add our needed code.
// Add KafkaServicebuilder.Services.AddSingleton<IKafkaService, KafkaService>();var app = builder.Build();// Get KafkaService from the service providervar kafkaService = app.Services.GetRequiredService<IKafkaService>();var cancellationTokenSource = new CancellationTokenSource();// Create Kafka topic if it doesn't existawait kafkaService.CreateTopicIfNotExistsAsync();// Run ConsumeMessages in a separate threadTask.Run(() => kafkaService.ConsumeMessages(cancellationTokenSource.Token));
Also, be aware that running ConsumeMessages in a separate thread like this can be risky. If an unhandled exception occurs in ConsumeMessages, it could terminate your application. You can instead use a background service or hosted service to manage this background task. This would provide better error handling and lifecycle management.
Publish & Consume Messages 📬
- Broker Registration: When a Kafka broker starts up, it registers itself with ZooKeeper. This includes the broker's hostname, port, and other metadata. When you publish a message, the Kafka producer queries ZooKeeper to find out which broker it should send the message to.
- Topic Configuration: ZooKeeper stores the configuration details of all topics, including the number of partitions for each topic and the replication factor. When you publish a message, the Kafka producer needs to know which partition to send the message to. It gets this information from ZooKeeper.
And we can start by one of the features that is considered an edge that Kafka has: Avro serialization. But until I show you how to do it, read more about Apache Avro and how Avro serialization can improve message transportation.
Comments
Post a Comment