Confluent Kafka Connect: Store Kafka Messages in Elasticsearch ✉️📂

 Suppose you need to store all the messages published on a certain topic. Normally you would create a consumer that listens to that topic, receives the message then stores it in the dataset that you want. What if I told you that Confluent has created a shortcut. Instead of doing this, you can just create a Kafka connector using Confluent Kafka Connect and this connector will automatically store any published message in your database.

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka.

Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. It can also deliver data from Kafka topics into secondary indexes like Elasticsearch or batch systems such as Hadoop for offline analysis.

In this tutorial we will focus on how to store your Kafka messages in an Elasticsearch index. I will be using the setup as my Kafka introductory post.

Running Kafka Connect 🏃

Just like most of previous posts I will be setting up Kafka connect using Docker Compose. But before doing so I need to prepare a Dockerfile for Kafka Connect so that it can include the Elasticsearch connector. Simply create a Dockerfile and paste the following line in it.

FROM confluentinc/cp-kafka-connect:latest

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:latest

Make sure this Dockerfile is located in the same path as the docker-compose.yml file we're about to define. Next let's create the docker-compose.yml file where we run all the needed services like Kafka, Zookeeper, Elasticsearch and of course Kafka Connect and make sure it reads from the Dockerfile we just added.
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    ports:
      - 9092:9092
      
  kafka-connect:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - 8083:8083
    depends_on:
      - kafka
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
   
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.13.0
    environment:
      - node.name=elasticsearch
      - cluster.name=es-docker-cluster
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - "xpack.security.enabled=false"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - esdata1:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
      
volumes:
 esdata1:
   driver: local
This should do it. Let's go ahead now and run docker compose up in the path where these files exist and wait until the images are pulled and the containers run successfully.

Please note that most probably that Kafka Connect will most probably fail because it needs some topics to be created and Kafka by default disables creating topics automatically. So, well edit the CreateTopicIfNotExistsAsync function we've established in the introductory post to add the needed topics by Kafka Connect along with the topic we want Elasticsearch to store its messages.
        private readonly string topicName = "kafka-connect-es";

        public async Task CreateTopicIfNotExistsAsync()
        {
            using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
            var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(20));

            var topicsToCreate = new List<string> { "kafka-connect-es", "docker-connect-offsets", "docker-connect-status", "docker-connect-configs" };
           
            foreach (var topic in topicsToCreate)
            {
                if (!metadata.Topics.Exists(t => t.Topic == topic))
                {
                    var topicSpecification = new TopicSpecification
                    {
                        Name = topic,
                        ReplicationFactor = 1,
                        NumPartitions = 1,
                        Configs = new Dictionary<string, string> { { "cleanup.policy", "compact" } }
                    };

                    await adminClient.CreateTopicsAsync(new List<TopicSpecification> { topicSpecification });
                }
            }
        }
After changing the function run your application and the topics will be created then restart Kafka Connect container and then it should work fine.

Next, let's configure Kafka Connect to connect it to our Elasticsearch service we just started.

Configuring Kafka Connect 🔧

Now in order to make to add an Elasticsearch Kafka connector we need to add a configuration to the connectors to define an Elasticsearch Sink that consumes the messages and stores them. Let's first try to hit the connectors to list them.


As you can see, there are no connectors yet. To add one, we need to define a JSON object that we can post to this same URL. One that looks like this.
{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "kafka-connect-es",
    "key.ignore": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "schema.ignore": "true",
    "type.name": "_doc",
    "name": "elasticsearch-sink",
    "connection.url": "http://elasticsearch:9200",
    "transforms": "ExtractTimestamp",
    "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.ExtractTimestamp.timestamp.field": "timestamp",
    "auto.create.indices.at.start": "true"
  }
}
Here I defined connector named elasticsearch-sink that ignores schema because we do not serialize and told the connector where to look for my Elasticsearch service. I also specified that the index should be created automatically with the name of the topic to ensure simplicity.

Save this as a config.json and then post it to http://localhost:8083/connectors/ and then let's list the connectors again.



Perfect, our connector was successfully created. All we need now is test it by posting a message and see if it lands in the index. Note that we have not created an index ourselves and we're expecting the connector to do it.

Sink Sink Sink 🚿💦

Let's go ahead and run our application and hit the post API to publish a message to the topic kafka-connect-es and then look for the index in Elasticserch.


Now, let's get the index and see if the index was created and message was stored successfully.


Perfect! Seems like everything went well. The index was automatically created, and the message was successfully stored in the index.

It's important to note that Elasticsearch isn't the only connector to Kafka. There are other sink connectors that can allow you to easily integrate your dataset with your broker, such as Cassandra, Hadoop, S3 and more. Kafka Connect also can help you ingest whole databases and stream them.

In the end, I just want to say that you should make sure that using these types of connectors shouldn't be a main part of your system and don't depend on it to store vital or important data. Rather, use it to for logging or analysis purposes. Because these types of sinks are useful for dumping data not managing it.


Comments

Popular posts

Google & Microsoft Token Validation Middleware in ASP.NET Applications

Publish and Consume Messages Using RabbitMQ as a Message Broker in 4 Simple Steps 🐰

Real Life Case Study: Synchronous Inter-Service Communication in Distributed Systems