Breaking Down Kafka: A Step-by-Step Guide to Publish & Consume Messages ✉️


 Message brokers come in all colors. Each broker has its own edge. There are some brokers which are aimed to be simple and direct such as Amazon SQS (which is literally called Simple Queue Service). Some of them can be used in a simple fashion, but can also be used to implement complex patterns like RabbitMQ which is a topic I talked about in my previous posts. And then there is a broker that was designed to: handle heavy-duty streaming, maximize efficiency, allow you to scale up, serialize messages to bytes, partition messages and much more. That broker is indeed Apache Kafka.

Due to its wide array of features, Kafka can be overwhelming. And sometimes it feels too exhausting to start comprehending its principles and what the broker can provide. So, why not strip down the broker from all of its extra shining features and just start from its core features like publishing and consuming simple JSON serialized string messages.

What we're going to explore isn't just Apache Kafka, rather Confluent Kafka. Let's understand the difference.

Confluent Inc. is a company founded by the original creators of Apache Kafka. They created Confluent Platform, which is a distribution of Apache Kafka that includes additional tools and capabilities to enhance Kafka's usability and functionality in production environments. Some of these features include Schema Registry, KSQL, Confluent Cloud, etc. But most of these features are pretty advanced. So, we'll not be tackling these topics in this post.

So, let's dive in into the world of Kafka and break down this huge structure into very small building blocks.

Let's see what steps we're expecting to follow in this guide:

  1. Run Kafka
  2. Create sample model for messages
  3. Set up a service to allow us to create a topic named "hardcode-topic" and publish and consume messages on this topic.
  4. Create a controller that uses the service to publish messages
  5. Initialize the Kafka service and create the topic if it doesn't exist
  6. Set up a listener that subscribes to the topic
  7. Start testing our publisher/consumer
And although it doesn't make sense to make the same service publish AND consume the messages, we're just doing it this way for simplicity.

Running Kafka 🏃

Let's prepare a docker-compose.yml file to run Kafka along with ZooKeeper. Before we do so, let's talk about ZooKeeper a little bit.

Apache ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services in distributed systems. It's robust, since it replicates data across all nodes in the cluster, and any server can service a read request, but write requests go through a leader to ensure consistency.

In the context of Apache Kafka (a distributed streaming platform), ZooKeeper is used for managing and coordinating Kafka brokers. It keeps track of the status of Kafka clusters nodes, and it also keeps track of Kafka topics, partitions etc.

However, starting from Kafka version 2.8.0, there is an option to run Kafka without ZooKeeper, using a self-managed mode called KRaft (Kafka Raft metadata mode). This is a significant step for Kafka, as it reduces operational complexity by getting rid of the dependency on ZooKeeper.

If this seems like too much to grasp. Let's see it in a more practical way once we get our services running.

So, let's create a docker-compose.yml file like I showed you before, and add the following to it.

version: '2'

services:

  zookeeper:

    image: confluentinc/cp-zookeeper:latest

    environment:

      ZOOKEEPER_CLIENT_PORT: 2181

      ZOOKEEPER_TICK_TIME: 2000

  kafka:

    image: confluentinc/cp-kafka:latest

    depends_on:

      - zookeeper

    environment:

      KAFKA_BROKER_ID: 1

      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

    ports:

      - 9092:9092

Now, let's go ahead and run this YAML file using docker compose up command and monitor our containers by either running docker ps command or from viewing their status in Docker Desktop.

Once your services are up and running, let's start creating an interface to communicate with Kafka but first let's make an example model we can use as our message.

Message Model 📜

No need to create a complex model. We only need something simple nothing fancy. 
namespace Domain.Models
{
    public record Document
    {
        public int Id { get; set; }
        public string Name { get; set; } = string.Empty;
    }
}
That seems to be enough now let's define the interface for the service needed.

Kafka Service Interface 📝

As an interface we would only need an exposed service that will allow us to publish new messages to a hardcoded topic.
using Domain.Models;

namespace Playground.Contracts
{
    public interface IKafkaService
    {
        Task PublishMessageAsync(Document document);
    }
}
Now that we have defined our interface let's create the service implementation.

Kafka Service Implementation 🔨

To recap, we need this service to do three things: create a topic, publish messages & consume messages. Creating topics for Kafka will be a little different than RabbitMQ, because establishing topics will not be idempotent. Meaning that if you try to create the same topic, an exception will be thrown.

So, we need to make sure that the topic doesn't exist before we try creating it. Which will be an approach that we will follow for simplicity.

However, in real life projects, it's better to have a specific service whose purpose is to create or manage topics. But again, I'm choosing this approach for simplicity.

That being said, let's implement the needed service.
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Domain.Models;
using Playground.Contracts;
using System.Text.Json;

namespace Playground.Implementations
{
    public class KafkaService : IKafkaService
    {
        private readonly string bootstrapServers = "localhost:9092";
        private readonly string topicName = "hardcode-topic";

        public async Task CreateTopicIfNotExistsAsync()
        {
            using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();

            var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
            if (!metadata.Topics.Exists(t => t.Topic == topicName))
            {
                await adminClient.CreateTopicsAsync(new List<TopicSpecification> { new() { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } });
            }
        }

        public async Task PublishMessageAsync(Document document)
        {
            var config = new ProducerConfig { BootstrapServers = bootstrapServers };

            using var producer = new ProducerBuilder<Null, string>(config).Build();

            var message = new Message<Null, string> { Value = JsonSerializer.Serialize(document) };

            await producer.ProduceAsync(topicName, message);
        }


        public void ConsumeMessages(CancellationToken cancellationToken)
        {
            var conf = new ConsumerConfig
            {
                GroupId = "hardcode-consumer-group",
                BootstrapServers = bootstrapServers,
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            using var consumer = new ConsumerBuilder<Ignore, string>(conf).Build();
            consumer.Subscribe(topicName);

            try
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    var consumeResult = consumer.Consume(cancellationToken);

                    // Deserialize message
                    var document = JsonSerializer.Deserialize<Document>(consumeResult.Message.Value)!;

                    Console.WriteLine($"Received message {document.Name}");
                }
            }
            catch (OperationCanceledException)
            {
                consumer.Close();
            }
        }
    }
}
Now, let's take a look at each function:
  • CreateTopicIfNotExistsAsync: This function is responsible for creating a topic that I hardcoded its name in the class (obviously this should be configurable or a parameter, but this is only an example). If that topic already exists it skips creating it.
    I chose the NumPartitions & ReplicationFactor  to be 1. This means that the topic will not be replicated and will not benefit from Kafka's fault tolerance features. It also means that you can have at most one consumer in a consumer group consuming messages from this topic. If you want to use more consumers to process messages in parallel, you would need to increase the number of partitions.

  • PublishMessageAsync: This function is pretty straightforward it just publishes a message of type Document after serializing it to string. I used Null as they key because I'm not using partitioning so no need to add a key to my message.

  • ConsumeMessages: This function is basically an infinite loop that keeps checking for new messages on the defined topic using an approach that is similar to long polling.
    Notice how we define a group name for our consumer, and this is important for scaling because if we have multiple consumers with the same group name, consuming the message will follow a race condition. Meaning the first one to find the message it will consume it and any other consumer in the same group will not find the message. This function needs to run in a separate thread in the program.cs.
Now that this is clear, let's go ahead and prepare our controller.

Kafka Controller 🎮

We need a POST API that will allow us to use our publish message service. So, all we need to do is create a simple controller like the following.
using Microsoft.AspNetCore.Mvc;
using Domain.Models;
using Playground.Contracts;

namespace Playground.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class KafkaController(IKafkaService kafkaService) : ControllerBase
    {
        [HttpPost]
        public async Task<IActionResult> PublishDocument(Document document)
        {
            await kafkaService.PublishMessageAsync(document);
            return Ok();
        }
    }
}
Now that we have our controller, let's inject the service when we register it in the program.cs file.

Register Kafka Service 📌

In order to register our Kafka service, we need to add the service class we just created as a singleton service in our program.cs. You can also add it as a transient service but in this example it wouldn't really matter.

After registering the service, we need to fetch it after the app builds and create our topic then start listening to it. And as I mentioned in the beginning, having the consumer and publisher of messages in the same service is not something that would happen in real life cases but I'm only doing this for simplicity.

So, let's go ahead and add our needed code.
// Add KafkaService
builder.Services.AddSingleton<IKafkaService, KafkaService>();

var app = builder.Build();

// Get KafkaService from the service provider
var kafkaService = app.Services.GetRequiredService<IKafkaService>();
var cancellationTokenSource = new CancellationTokenSource();

// Create Kafka topic if it doesn't exist
await kafkaService.CreateTopicIfNotExistsAsync();

// Run ConsumeMessages in a separate thread
Task.Run(() => kafkaService.ConsumeMessages(cancellationTokenSource.Token));
As you can see, it's basically the same steps I told you about. Note how it's important to provide a cancellation token, so that you can stop the listener anytime you want. Preferably when an exception occurs.

Also, be aware that running ConsumeMessages in a separate thread like this can be risky. If an unhandled exception occurs in ConsumeMessages, it could terminate your application. You can instead use a background service or hosted service to manage this background task. This would provide better error handling and lifecycle management.

Excellent! Now let's go ahead and test our broker.

Publish & Consume Messages 📬

After completing the steps above we can run our application and start publishing a message to see if everything goes well. So, after the application runs, I'm going to hit the API to publish a message.


Let's now check our application console to see if anything was printed. You can also set a breakpoint in the consumer and see it it gets hit after you publish the message.

Perfect! Our message was indeed published, and the consumer was able to consume it.

We finished our example and we haven't mentioned ZooKeeper. So, why did we run it? How did it play a part in what we just did? It actually played a crucial role behind the scenes such as:
  • Broker Registration: When a Kafka broker starts up, it registers itself with ZooKeeper. This includes the broker's hostname, port, and other metadata. When you publish a message, the Kafka producer queries ZooKeeper to find out which broker it should send the message to.

  • Topic Configuration: ZooKeeper stores the configuration details of all topics, including the number of partitions for each topic and the replication factor. When you publish a message, the Kafka producer needs to know which partition to send the message to. It gets this information from ZooKeeper.
And some other roles related to partitioning and groups which may not be the best time to talk about in this introductory post.

So, we have achieved what we wanted to do in this post. Which is to strip down Kafka of anything complicated and just use it a simple message broker. Now, the next step would be to start building on it feature by feature until we get into the more complex parts.

And we can start by one of the features that is considered an edge that Kafka has: Avro serialization. But until I show you how to do it, read more about Apache Avro and how Avro serialization can improve message transportation.

Comments

Popular posts

Google & Microsoft Token Validation Middleware in ASP.NET Applications

Publish and Consume Messages Using RabbitMQ as a Message Broker in 4 Simple Steps 🐰

Real Life Case Study: Synchronous Inter-Service Communication in Distributed Systems