Confluent Kafka Connect: Store Kafka Messages in Elasticsearch ✉️📂
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka.
Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. It can also deliver data from Kafka topics into secondary indexes like Elasticsearch or batch systems such as Hadoop for offline analysis.
In this tutorial we will focus on how to store your Kafka messages in an Elasticsearch index. I will be using the setup as my Kafka introductory post.
Running Kafka Connect 🏃
Just like most of previous posts I will be setting up Kafka connect using Docker Compose. But before doing so I need to prepare a Dockerfile for Kafka Connect so that it can include the Elasticsearch connector. Simply create a Dockerfile and paste the following line in it.
FROM confluentinc/cp-kafka-connect:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:latest
version: '2'services:zookeeper:image: confluentinc/cp-zookeeper:latestenvironment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:latestdepends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"ports:- 9092:9092kafka-connect:build:context: .dockerfile: Dockerfileports:- 8083:8083depends_on:- kafkaenvironment:CONNECT_BOOTSTRAP_SERVERS: kafka:9092CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connectCONNECT_REST_PORT: 8083CONNECT_GROUP_ID: compose-connect-groupCONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configsCONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsetsCONNECT_STATUS_STORAGE_TOPIC: docker-connect-statusCONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverterCONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverterCONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERRORelasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:8.13.0environment:- node.name=elasticsearch- cluster.name=es-docker-cluster- discovery.type=single-node- bootstrap.memory_lock=true- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- "xpack.security.enabled=false"ulimits:memlock:soft: -1hard: -1volumes:- esdata1:/usr/share/elasticsearch/dataports:- 9200:9200volumes:esdata1:driver: local
After changing the function run your application and the topics will be created then restart Kafka Connect container and then it should work fine.private readonly string topicName = "kafka-connect-es";public async Task CreateTopicIfNotExistsAsync(){using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(20));var topicsToCreate = new List<string> { "kafka-connect-es", "docker-connect-offsets", "docker-connect-status", "docker-connect-configs" };foreach (var topic in topicsToCreate){if (!metadata.Topics.Exists(t => t.Topic == topic)){var topicSpecification = new TopicSpecification{Name = topic,ReplicationFactor = 1,NumPartitions = 1,Configs = new Dictionary<string, string> { { "cleanup.policy", "compact" } }};await adminClient.CreateTopicsAsync(new List<TopicSpecification> { topicSpecification });}}}
Configuring Kafka Connect 🔧
{"name": "elasticsearch-sink","config": {"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max": "1","topics": "kafka-connect-es","key.ignore": "true","value.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable": "false","schema.ignore": "true","type.name": "_doc","name": "elasticsearch-sink","connection.url": "http://elasticsearch:9200","transforms": "ExtractTimestamp","transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value","transforms.ExtractTimestamp.timestamp.field": "timestamp","auto.create.indices.at.start": "true"}}
Sink Sink Sink 🚿💦
Perfect! Seems like everything went well. The index was automatically created, and the message was successfully stored in the index.
Comments
Post a Comment