Partitioning in Kafka: A Guide to Publishing Batched Data πŸ“ƒπŸ“©


 There are two keywords that you must understand when publishing a message: latency and throughput. 

Throughput is a measure of how much data can be processed in a given amount of time. It's usually measured in bits per second (bit/s), or data packets per second. High throughput means the system can process a large amount of data quickly, which is often desirable in high-load scenarios.

For example, if a Kafka producer can send 1000 messages per second to a broker, the throughput is 1000 messages per second.

Latency, on the other hand, is a measure of time delay experienced in a system, the time it takes for a bit of data to travel from one point to another in a network. It is usually measured in milliseconds. Low latency means that data can be transferred quickly from source to destination.

For example, if a message takes 10 milliseconds from the time it's sent by a Kafka producer until it's received by a broker, the latency is 10 milliseconds.

In all systems, there's a trade-off between latency and throughput. For example, in Kafka, increasing batch size can increase throughput (because more messages are sent in each network request), but it can also increase latency (because messages wait longer before being sent).

Batching and partitioning can help you control these two aspects of your broker. Which is one of the powerful features that gives Kafka an edge when it comes to dealing with large amount of data.

Setting up a topic with more than one partition is a way to tell your broker that more than one consumer will read from this topic. So, the load will be balanced along these consumers. In other words, the maximum parallelism you can achieve is limited by the number of partitions. So, if you want to scale out further, you will need to add more partitions.

If you don't ask your producer to publish your message to a certain partition it will load balance between the available consumers using by default a round-robin pattern.

Let's clarify this more using a hands-on example. Imagine that you have a list of items that you wish to publish on a certain topic. And this list could be any size and you wish to distribute it among multiple consumers so that one consumer doesn't get overwhelmed with a huge number of messages. But at the same time, you wish that certain messages are consumed by the same consumer.

In order to do so, you would have to either publish these messages using a key which will guarantee that all messages with that key be consumed by the same consumer. Or you can publish the message while specifying the partition you wish to publish to.

To implement this approach, I will show you a tutorial in which I will:

  1. Declare a topic where I define 2 partitions.
  2. Publish a list of messages to two different partition numbers
  3. Set up 2 consumers with the same group with different partition numbers
  4. Check if all the messages that have the same partition number, land in the correct partition

So, let's go ahead and apply this hands-on example to test out this feature.

Implementation πŸ”¨

We'll mostly use the same setup as the Kafka introductory post, and we'll apply the following changes to it.

We'll start with the interface so that it will look as follows.

using Domain.Models;

namespace Playground.Contracts

{

    public interface IKafkaService

    {

        void PublishMessage(Document document);

        void ConsumeMessage(string groupName, string consumerName, int partition, CancellationToken cancellationToken);

        Task CreateTopicIfNotExistsAsync();

    }

}

I basically added the partition number when setting up a consumer, so that I can assign the consumer to a certain partition. Now, let's implement this interface and update our KafkaService.

using Confluent.Kafka;

using Confluent.Kafka.Admin;

using Domain.Models;

using Playground.Contracts;

using System.Text.Json;


namespace Playground.Implementations

{

    public class KafkaService : IKafkaService

    {

        private readonly string bootstrapServers = "localhost:9092";

        private readonly string topicName = "hardcode-topic-partitioned";


        public async Task CreateTopicIfNotExistsAsync()

        {

            using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();

            var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(20));


            if (!metadata.Topics.Exists(t => t.Topic == topicName))

            {

                await adminClient.CreateTopicsAsync(new List<TopicSpecification> { new() { Name = topicName, ReplicationFactor = 1, NumPartitions = 2 } });

            }

        }


        public void PublishMessage(Document document)

        {

            var config = new ProducerConfig

            {

                BootstrapServers = bootstrapServers,

                BatchSize = 50000,  // 50 KB batch size

                LingerMs = 5  // 5 ms delay before sending

            };


            using var producer = new ProducerBuilder<Null, string>(config).Build();



            for (int i = 1; i <= 10; i++)

            {

                var messageDocument = new Document { Name = $"{document.Name}-{i}", Id = document.Id };


                var value = JsonSerializer.Serialize(messageDocument);


                var message = new Message<Null, string> { Value = value };


                // Publish this message to a specific partition

                producer.Produce(new TopicPartition(topicName, new Partition(i <= 5 ? 0 : 1)), message);

            }


            producer.Flush(TimeSpan.FromSeconds(10));

        }


        public void ConsumeMessage(string groupName, string consumerName, int partition, CancellationToken cancellationToken)

        {

            var config = new ConsumerConfig

            {

                BootstrapServers = bootstrapServers,

                GroupId = groupName,

                AutoOffsetReset = AutoOffsetReset.Latest

           };


            using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();


            // Manually assign a partition to this consumer

            consumer.Assign(new TopicPartition(topicName, new Partition(partition)));


            consumer.Subscribe(topicName);

            try

            {

                while (!cancellationToken.IsCancellationRequested)

                {

                    var consumeResult = consumer.Consume(cancellationToken);


                    // Deserialize message

                    var document = JsonSerializer.Deserialize<Document>(consumeResult.Message.Value)!;


                    Console.WriteLine($"Received message '{document.Name}' with offset {consumeResult.Offset} by group '{groupName}' of consumer '{consumerName}' and partition {partition}");

                }

            }

            catch (Exception)

            {

               consumer.Close();

            }

        }

    }

}

As you can see in this service:

  1. I created a topic with 2 partitions not 1

  2. I added a producer config which had:
    BatchSize: This controls the maximum size of a batch in bytes. The producer will attempt to   batch records together up to this limit.
    LingerMs: This controls the maximum time to buffer the data. When set, the producer will wait   for up to the given number of milliseconds before sending a batch, even if the batch size hasn't   been reached. This can allow more records to be batched together.

  3. I published the first 5 messages to partition 0 and the other 5 to partition 1.

  4. I used the Flush method to make sure all produced messages have reached the broker.

  5. I changed the consumer function to be able assign a partition to the consumer. So that we can make sure that the published messages to these partitions are consumed by this specific consumer.

Now that we've prepared our service, let's add our consumers in program.cs.

// First group-A consumer

Task.Run(() => kafkaService.ConsumeMessage("group-A", "first group-A consumer", 0, cancellationTokenSource.Token));

// Second group-A consumer

Task.Run(() => kafkaService.ConsumeMessage("group-A", "second group-A consumer", 1, cancellationTokenSource.Token));

Perfect! Now that everything is ready. Let's fire up the application and see if everything goes right.

Party in the Partitions πŸŽ‰

Before we start our application just remember to update your controller to remove the await from PublishMessage invoking. If that's all done, let's go ahead and run the application.

After the application runs and your containers are up and running. Let's post a message to the application.

Now let's check the console of the application to see if we the messages were consumed as expected.

Excellent! Looks like Kafka has routed our messages successfully to the targeted partitions. This actually might be similar to posting messages using routing keys in RabbitMQ, but of course it's very different in terms of concept.

It's very important to note that even though the consumers are in the same group, they may be consuming from different partitions of the topic. Each partition maintains its own set of offsets, so the offset seen by one consumer may be different from the offset seen by another consumer, even if they're in the same group.

For example, let's say you have a topic with two partitions, and two consumers in the same group. Consumer A might be consuming from partition 0, and consumer B from partition 1. If consumer A has consumed 10 messages from partition 0, its offset for that partition will be 10. Meanwhile, if consumer B has consumed 20 messages from partition 1, its offset for that partition will be 20.

In the end, I just wanted to clarify and show the versatile ways that Kafka offers to tolerate faults and avoid failures through scaling and load balancing between consumers.

Comments

Popular posts

Google & Microsoft Token Validation Middleware in ASP.NET Applications

Publish and Consume Messages Using RabbitMQ as a Message Broker in 4 Simple Steps 🐰

Real Life Case Study: Synchronous Inter-Service Communication in Distributed Systems