Introduction to Consumer Groups: Learn How to Horizontally Scale Kafka Consumers πŸ“₯

 People say that Kafka is a dumb broker. It just holds data under some defined topics and forward messages from producers to consumers. It doesn't do much processing on the messages, doesn't route them based on content, and doesn't transform them. But that actually isn't entirely true.

Kafka does much more than just storing messages. It keeps count of which consumer group consumed which message by tracking its offset. The consumer group is the group Id you give your consumer once you initialize it. Having more than one consumer helps you avoid consumer failures and scale better.

If a message is consumed by one consumer in a group, no other consumers in the same group will receive it. However, all other consumers with other group Ids will receive the message.

It's important to note that the offset of each message is basically the id of the message related to the group. So, if "message A" was published on a brand-new group its offset will be 0. But if another group has consumed three other messages before then the offset of the same message will be 4.

If it's still not quite clear, let's apply this in a hand-on example using the same setup we prepared in our Kafka introductory post.

What we will do is:

  1. Update the ConsumeMessage function to get the group name as a parameter along with a name for the consumer to help us understand which consumer received the message.

  2. Add three consumers, two of them will have the same group "group-A" and the third will have a different consumer group under the name "group-B".

  3. Publish a message and see which consumers will consume the message.
Perfect! So, let's get started by applying our changes.

Updating ConsumeMessage πŸ”§

We will not change much, just a simple tweak to allow us to log the consumer's group name along with the consumer's name.

So, in the IKafkaService interface, change the ConsumeMessage declaration to the following.
void ConsumeMessage(string groupName, string consumerName, CancellationToken cancellationToken);
After that, open the implementation and change the actual function to the following:
public void ConsumeMessage(string groupName, string consumerName, CancellationToken cancellationToken)
{
    var config = new ConsumerConfig
    {
        BootstrapServers = bootstrapServers,
        GroupId = groupName,
        AutoOffsetReset = AutoOffsetReset.Latest
    };

    using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();

    consumer.Subscribe(topicName);
    try
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var consumeResult = consumer.Consume(cancellationToken);

            // Deserialize message
            var document = JsonSerializer.Deserialize<Document>(consumeResult.Message.Value)!;

            Console.WriteLine($"Received message '{document.Name}' with offset {consumeResult.Offset} by group '{groupName}' of consumer '{consumerName}'");
        }
    }
    catch (Exception)
    {
       consumer.Close();
    }
}
Basically, all that has changed is that I now get the group name from the parameters, and I just log information that can help me identify which consumer consumed the message.

Also notice how I changed the AutoOffsetReset to Latest so that I don't receive all the unconsumed messages when I create a new group. 

Great! Everything seems fine now. Let's proceed to our next step.

Add Consumers ➕πŸ“₯

As I said, I will be adding three consumers two from the same group and another from a different group. Let's call our first group "group-A" and the other "group-B" and name our consumers accordingly.

As we did in the previous post, let's run three different threads for each consumer in the program.cs file as follows.
// First group-A consumer
Task.Run(() => kafkaService.ConsumeMessage("group-A", "first A group", cancellationTokenSource.Token));

// Second group-A consumer
Task.Run(() => kafkaService.ConsumeMessage("group-A", "second A group", cancellationTokenSource.Token));

// First group-B consumer
Task.Run(() => kafkaService.ConsumeMessage("group-B", "first B group", cancellationTokenSource.Token));
Excellent! Now that we've added our consumers, let's start up our application after running our containers like we did here and see what happens when we publish a message.

Who Shall Consume? πŸ“¬

Let's go ahead and publish a message to see what will happen.


Now, let's check the application console to check what got logged.


So, from what we see here:
  1.  The two groups were new groups, so the offset was 0

  2. The message was consumed once per group although we ran three consumers

  3. The message that was consumed by "group-A" fell under what is known as "race condition" where the consumer ("second A group") that finds the message first, consumes it and any other consumers under the same group will not ("first A group").
But now that we understand groups, it's important to understand partitioning too. Because in order to utilize groups to help us scale, we have to pair it with partitioning. Which I will be talking about in an upcoming post. So, make sure to stick around!

Comments

Popular posts

Why I Hate Microservices Part 1: The Russian Dolls Problem πŸͺ†πŸͺ†πŸͺ†

Why I Hate Microservices Part 3: The Identity Crisis 😡

Why I Hate Microservices Part 2: The Who's Telling the Truth Problem 🀷