JDBC Kafka Source Connector: Stream Database Changes or ENTIRE Tables ππ
Kafka Connect offers a very powerful feature which is connecting to databases using certain connectors as JDBC (Java Database Connectivity). The Kafka Connect JDBC source connector allows you to import data from any relational database with a JDBC driver into an Apache Kafka topic. This connector supports a wide variety of databases. Data is loaded by periodically executing a SQL query and creating an output record for each row in the result set. It enables you to pull data (source) from a database into Kafka, and also push data (sink) from a Kafka topic to a database.
In this tutorial we'll be focusing on pulling data from a SQL database. I will be reusing the same setup as my previous Kafka Connect post so make sure to follow the steps in that post to prepare your application and the needed containers.
Running Kafka Connect π
FROM confluentinc/cp-kafka-connect:latestRUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:latestRUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
Configuring JDBC π§
Incrementing means that when a certain specified column increments, a new message will be published on a topic with the same name as the table. Streaming database changes allows real-time analytics, monitoring, data warehousing, logging, ETL and more.
Bulk mode constantly publishes all the records in the table in the form of messages. So, let's prepare the configs needed for both modes. Bulk streaming has the same benefits as incrementing streaming, but it also allows database replication.
{"name": "jdbc-source-connector","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:sqlserver://host.docker.internal;databaseName=kafka-jdbc;user=jdbc;password=123;","connection.user": "jdbc","connection.password": "123","table.whitelist": "Documents","mode": "incrementing","incrementing.column.name": "Id","topic.prefix": "sql-","poll.interval.ms": 1000}}
{"name": "jdbc-source-connector-bulk","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:sqlserver://host.docker.internal;databaseName=kafka-jdbc;user=jdbc;password=123;","connection.user": "jdbc","connection.password": "123","table.whitelist": "Documents","mode": "bulk","topic.prefix": "sql-bulk-","poll.interval.ms": 5000}}
Streaming Changes π
Now I will take a look at my application's console to see if the consumer of the topic sql-Document has logged anything.
Perfect! My new record has been published as a message to the topic and that happened because the column (Id) that I specified in the config has been incremented, so the change was detected by the connector and published a message with the new record.
Streaming Entire Tables π¦
Now for the bulk streaming, we need to create a topic with the same name we configured in our connector which is sql-bulk-Documents and set up a consumer for this topic in our application.
It's important to note that once your topic is created, the connector will constantly push all the records in the table specified in the configuration every poll interval. And this is not a one-time operation. It will keep pushing the records until you pause the connector by putting to the API named pause.
curl -X PUT "http://your-kafka-connect-host:8083/connectors/my-connector/pause"
- Timestamp Mode: In timestamp mode, the connector uses a timestamp column to detect only new rows. It compares the timestamp of each row with the last known timestamp value to identify changes. This mode is useful when your database table has a reliable timestamp column that reflects data modifications.
- Timestamp+Incrementing Mode: This mode combines both a timestamp column and an incrementing column. It provides more accurate tracking of changes by considering both the timestamp and incremental values. Use this mode when you want to capture both INSERT and UPDATE events.
Comments
Post a Comment