Confluent Schema Registry: Learn Efficient Kafka Avro Serialization 📜


When we talk about Kafka it's important to mention Avro serialization. Like I mentioned in my Kafka introductory post: as a message broker, Kafka is quite known for being complex as opposed to some other brokers such as SQS (which is literally called Simple Queue Service) or RabbitMQ which calls its producers BasicProduce and consumers BasicConsume.

And although it requires a little more effort than the other brokers, it offers much more for services which need to trim down every inch of latency they can, due to the criticality of the business of the sheer volume of data it requires to stream.

And Confluent Kafka can help with that by transporting Avro serialized messages. But in order to do that, Kafka relies on Confluent Schema Registry. Because after it gets serialized to bytes, it needs to look for the schema that can help these bytes go back to the model that it initially was.

Confluent Schema Registry is basically a store that keeps track of the schemas of the messages you want to convert to Avro. It offers a lot of features and automatically compares schemas to avoid duplication and is able to merge schemas if new fields were added or optional fields were removed.

It's a whole topic of itself, but we'll not dig that deep into it in this post as we will only use it to store the schema of our converted message and then retrieve that schema.

And this is actually should not be the correct way to deal with schema registries in production environments or real-life projects. Because you need to keep track of versions, maintain them and make sure that the consumer is looking at the same schema version that the producer is looking at.

So, as I said it's much more than how I'm going to use it in this example, but I just need to use it this way just to highlight how to serialize your messages. So, let's start our simple example.

Note that I will be using the same setup that I used in my previous post. So, go check it out and follow the steps before continuing with the following tutorial.

Running Schema Registry 🏃

As I said, we need Confluent Schema Registry so that we can store the schema of our message so that we are able to deserialize it when we want to consume it. So, let's go ahead and add the Schema Registry image to our docker-compose.yml file.
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 9092:9092

  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    depends_on:
      - kafka
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
    ports:
      - 8081:8081
It's the same YAML file we used in the previous post, we just added the latest version of the Schema Registry image and made listen to the Kafka service and run on port 8081.

Now, there's a problem you might face after your containers run successfully. Your application might not resolve the kafka host name so if you do face this issue go ahead and open your hosts file in C:\Windows\System32\drivers\etc\hosts and add this line 127.0.0.1 kafka which will make your application resolve the host name kafka to your localhost where you are running your Kafka service.

Great! Now after you run docker compose up, your containers should run and now we can go ahead and add our needed code in the KafkaService.

Avro SerDe ✉️🔁01010 

SerDe means serialization & deserialization. Which is what we're going to do now in the following implementation. We'll edit the service we created in the previous post to add the following to publish a message:
  • Creating a schema for our Document model if it doesn't exist
  • Storing the schema in Schema Registry
  • Fetching the latest schema version to convert the message to Avro
  • Serialize the message to Avro format
  • Publish the message
And we'll add the following to consume a message:
  • Fetching the schema for the model from Schema Registry
  • Use the version to build the deserializer
  • Consume the message
Now that we've went through the needed steps, let's translate that to code
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Confluent.SchemaRegistry;
using Domain.Models;
using Playground.Contracts;
using Chr.Avro.Abstract;
using Chr.Avro.Representation;
using Chr.Avro.Confluent;
using Schema = Confluent.SchemaRegistry.Schema;

namespace Playground.Implementations
{
    public class KafkaService : IKafkaService
    {
        private readonly string bootstrapServers = "localhost:9092";

        private readonly string topicName = "hardcode-topic";

        private readonly string schemaRegistryUrl = "localhost:8081"; // Update with your Schema Registry URL

        private readonly CachedSchemaRegistryClient schemaRegistry;

        public KafkaService()
        {
            var schemaRegistryConfig = new SchemaRegistryConfig { Url = schemaRegistryUrl };

            schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
        }

        public async Task CreateTopicIfNotExistsAsync()
        {
            using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();

            var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
            if (!metadata.Topics.Exists(t => t.Topic == topicName))
            {
                await adminClient.CreateTopicsAsync(new List<TopicSpecification> { new() { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } });
            }
        }

        public static Schema GetSchema<TMessage>()
        {
            // Build a schema for your class
            var schemaBuilder = new SchemaBuilder();

            var schema = schemaBuilder.BuildSchema<TMessage>();

            var writer = new JsonSchemaWriter();

            var stringSchema = writer.Write(schema);

            return new Schema(stringSchema, SchemaType.Avro);
        }

        public async Task<int> GetSchemaVersion(Schema schema)
        {
            // Check if schema exists in the schema registry
            try
            {
                var lookupSchema = await schemaRegistry.LookupSchemaAsync(topicName, schema, true);
                return lookupSchema?.Version ?? 0;
            }
            catch (SchemaRegistryException)
            {
                return 0;
            }
        }

        public async Task PublishMessageAsync(Document document)
        {
            var config = new ProducerConfig { BootstrapServers = bootstrapServers };

            // Build a schema for your class
            var schema = GetSchema<Document>();

            // Register the schema with your schema registry
            var schemaBuilder = new SchemaBuilder();

            var serializerBuilder =
                  new SchemaRegistrySerializerBuilder(schemaRegistry, schemaBuilder);

            // Check if schema exists in the schema registry and get the version
            var schemaVersion = await GetSchemaVersion(schema);

            // If schema doesn't exist, register it
            if (schemaVersion == 0)
            {
                await schemaRegistry.RegisterSchemaAsync(topicName, schema);

                var latestSchema = await schemaRegistry.GetLatestSchemaAsync(topicName);
            
                schemaVersion = latestSchema.Version;
            }

            var producerBuilder = new ProducerBuilder<Null, Document>(config);

            var producerBuilderSerializer = await producerBuilder.SetAvroValueSerializer(serializerBuilder,
                topicName,
               schemaVersion);

            using var producer = producerBuilderSerializer.Build();

            var message = new Message<Null, Document> { Value = document };

            await producer.ProduceAsync(topicName, message);

            Console.WriteLine($"Published Avro serialized message '{document.Name}'");
        }

        public async void ConsumeMessageAsync(CancellationToken cancellationToken)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = bootstrapServers,
                GroupId = "hardcode-consumer-group",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            var consumerBuilder = new ConsumerBuilder<Ignore, Document>(config);

            // Build a schema for your class
            var schema = GetSchema<Document>();

            // Check if schema exists in the schema registry and get the version
            var schemaVersion = await GetSchemaVersion(schema);

            // You should handle the case where the schema doesn't exist in the schema registry

            // set the Avro deserializer
            await consumerBuilder.SetAvroValueDeserializer(new SchemaRegistryDeserializerBuilder(schemaRegistry),
                topicName,
                schemaVersion);

            using var consumer = consumerBuilder.Build();

            consumer.Subscribe(topicName);

            while (!cancellationToken.IsCancellationRequested)
            {
                var consumeResult = consumer.Consume(cancellationToken);

                Console.WriteLine($"Consumed Avro deserialized message '{consumeResult.Message.Value.Name}'");
            }
        }

    }
}
The service now follows the steps I've clarified above. Just remember that managing schemas is a whole other topic and is heavily dependent on how you would want your application to run. There are no wrong answers it's a design matter.

Perfect! So, let's see if our messages get published and consumed.

Pub & Sub 📨📩

After you make sure your containers are up and running, go ahead fire up your application. We'll be hitting the same API we defined before to post a message to our topic.


Great! Now, let's check the application console to see the result.


Our message was successfully published and consumed. I think we've covered a sufficient section of Schema Registry and Avro serialization and added another block to the Kafka learning building.

It's important to know that Avro serialization is not the only way to serialize your message. There's also Google's Protobuf (Protocol Buffer). Which actually can be faster than Avro and offers better cross-platform compatibility, which is why it's adopted by gRPC. On the other hand, Avro is more flexible and more readable.

In summary, Kafka still has more options to discover and more features to dive into such as partitioning and groups and more. Make sure that you play around and see what else you can learn in terms of patterns and features that Kafka can offer.

Comments

Popular posts

Google & Microsoft Token Validation Middleware in ASP.NET Applications

Publish and Consume Messages Using RabbitMQ as a Message Broker in 4 Simple Steps 🐰

Real Life Case Study: Synchronous Inter-Service Communication in Distributed Systems