Introduction to Consumer Groups: Learn How to Horizontally Scale Kafka Consumers π₯
People say that Kafka is a dumb broker. It just holds data under some defined topics and forward messages from producers to consumers. It doesn't do much processing on the messages, doesn't route them based on content, and doesn't transform them. But that actually isn't entirely true.
Kafka does much more than just storing messages. It keeps count of which consumer group consumed which message by tracking its offset. The consumer group is the group Id you give your consumer once you initialize it. Having more than one consumer helps you avoid consumer failures and scale better.
If a message is consumed by one consumer in a group, no other consumers in the same group will receive it. However, all other consumers with other group Ids will receive the message.
It's important to note that the offset of each message is basically the id of the message related to the group. So, if "message A" was published on a brand-new group its offset will be 0. But if another group has consumed three other messages before then the offset of the same message will be 4.
If it's still not quite clear, let's apply this in a hand-on example using the same setup we prepared in our Kafka introductory post.
What we will do is:
- Update the ConsumeMessage function to get the group name as a parameter along with a name for the consumer to help us understand which consumer received the message.
- Add three consumers, two of them will have the same group "group-A" and the third will have a different consumer group under the name "group-B".
- Publish a message and see which consumers will consume the message.
Updating ConsumeMessage π§
void ConsumeMessage(string groupName, string consumerName, CancellationToken cancellationToken);After that, open the implementation and change the actual function to the following:
public void ConsumeMessage(string groupName, string consumerName, CancellationToken cancellationToken){var config = new ConsumerConfig{BootstrapServers = bootstrapServers,GroupId = groupName,AutoOffsetReset = AutoOffsetReset.Latest};using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();consumer.Subscribe(topicName);try{while (!cancellationToken.IsCancellationRequested){var consumeResult = consumer.Consume(cancellationToken);// Deserialize messagevar document = JsonSerializer.Deserialize<Document>(consumeResult.Message.Value)!;Console.WriteLine($"Received message '{document.Name}' with offset {consumeResult.Offset} by group '{groupName}' of consumer '{consumerName}'");}}catch (Exception){consumer.Close();}}
Also notice how I changed the AutoOffsetReset to Latest so that I don't receive all the unconsumed messages when I create a new group.
Add Consumers ➕π₯
// First group-A consumerTask.Run(() => kafkaService.ConsumeMessage("group-A", "first A group", cancellationTokenSource.Token));// Second group-A consumerTask.Run(() => kafkaService.ConsumeMessage("group-A", "second A group", cancellationTokenSource.Token));// First group-B consumerTask.Run(() => kafkaService.ConsumeMessage("group-B", "first B group", cancellationTokenSource.Token));
Who Shall Consume? π¬
Let's go ahead and publish a message to see what will happen.- The two groups were new groups, so the offset was 0
- The message was consumed once per group although we ran three consumers
- The message that was consumed by "group-A" fell under what is known as "race condition" where the consumer ("second A group") that finds the message first, consumes it and any other consumers under the same group will not ("first A group").
Comments
Post a Comment