JDBC Kafka Source Connector: Stream Database Changes or ENTIRE Tables πŸš‚πŸ“‘



 Confluent has built some useful tools that rely on Kafka. One of these tools is Kafka Connect. And as I have shown you before one of the ways to use Kafka Connect as a consumer of Kafka Topics, I will show you how to use Kafka Connect as a producer to Kafka topics

Kafka Connect offers a very powerful feature which is connecting to databases using certain connectors as JDBC (Java Database Connectivity).  The Kafka Connect JDBC source connector allows you to import data from any relational database with a JDBC driver into an Apache Kafka topic. This connector supports a wide variety of databases. Data is loaded by periodically executing a SQL query and creating an output record for each row in the result set. It enables you to pull data (source) from a database into Kafka, and also push data (sink) from a Kafka topic to a database.

In this tutorial we'll be focusing on pulling data from a SQL database. I will be reusing the same setup as my previous Kafka Connect post so make sure to follow the steps in that post to prepare your application and the needed containers.

Running Kafka Connect πŸƒ

As I mentioned I will be reusing the YAML file I used to run Kafka Connect, Kafka and Zookeeper. But I will be editing the Dockerfile to add the JDBC connector along with the Elasticsearch connector although we will not be using the Elasticsearch connector, but I will just leave it there.

FROM confluentinc/cp-kafka-connect:latest

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest

Great! Now, let's run the same docker-compose.yml file from the previous post using docker compose up.

After the containers run, we should be able be able to communicate with Kafka Connect and so let's for example get the connectors.


Perfect! Now that Kafka Connect is running and responding, let's go ahead and configure our JDBC connectors.

Configuring JDBC πŸ”§

There are several modes in JDBC connector but in this tutorial, I will be focusing on incrementing and bulk.

Incrementing means that when a certain specified column increments, a new message will be published on a topic with the same name as the table. Streaming database changes allows real-time analytics, monitoring, data warehousing, logging, ETL and more. 

Bulk mode constantly publishes all the records in the table in the form of messages. So, let's prepare the configs needed for both modes. Bulk streaming has the same benefits as incrementing streaming, but it also allows database replication.

I will be connecting to a database named kafka-jdbc which has a table named Documents. And this is the table that I will be streaming to a topic with the same name as the table but with a different prefix for each mode.

Let's start with the incrementing mode config.
{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:sqlserver://host.docker.internal;databaseName=kafka-jdbc;user=jdbc;password=123;",
    "connection.user": "jdbc",
    "connection.password": "123",
    "table.whitelist": "Documents",
    "mode": "incrementing",
    "incrementing.column.name": "Id",
    "topic.prefix": "sql-",
    "poll.interval.ms": 1000
  }
}
As you can see, I simply added a connection string, table name, a prefix to add to the name of the table when publishing to the topic, the name of the column that on its incrementing should publish a new message and finally a poll interval.

Let's go ahead and post this config to set up or incremental connector. Also, just like my previous post I will be posting this config by saving it in a file then posting this file via the Kafka Connect API.


Perfect! Now let's set up our bulk connector. I will be using almost the same configuration, but I will just increase the interval, change the prefix and change the name of the connector.
{
  "name": "jdbc-source-connector-bulk",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:sqlserver://host.docker.internal;databaseName=kafka-jdbc;user=jdbc;password=123;",
    "connection.user": "jdbc",
    "connection.password": "123",
    "table.whitelist": "Documents",
    "mode": "bulk",
    "topic.prefix": "sql-bulk-",
    "poll.interval.ms": 5000
  }
}
Note that I'm using the host.docker.internal server because my database is running on my local SQL server.

Let's also post this connector using the API.


Excellent! Now that our config files are posted successfully, let's run our application to listen to the topics and also create these topics because otherwise, Kafka Connect will not automatically create these topics.

Streaming Changes πŸ“

Let's first run the application and create the needed topics for Kafka Connect like we did in the previous post but this time we will create also a topic with the name sql-Documents so that our incrementing connector can publish messages on it. I will also run a consumer that can listen to that topic. If you don't know how to set up a consumer, follow the steps in this post but don't deserialize your message, just log it instead.

After running the application and creating the topics I will be adding a new record to my database to see if this new record has been published as a message or not.




My Documents table has already three records so when I add the fourth one, Kafka Connect should publish that change to the topic. So, I will run this command and see what happens.

Now I will take a look at my application's console to see if the consumer of the topic sql-Document has logged anything.

Perfect! My new record has been published as a message to the topic and that happened because the column (Id) that I specified in the config has been incremented, so the change was detected by the connector and published a message with the new record.

Streaming Entire Tables πŸ“¦

Now for the bulk streaming, we need to create a topic with the same name we configured in our connector which is sql-bulk-Documents and set up a consumer for this topic in our application.

It's important to note that once your topic is created, the connector will constantly push all the records in the table specified in the configuration every poll interval. And this is not a one-time operation. It will keep pushing the records until you pause the connector by putting to the API named pause.

curl -X PUT "http://your-kafka-connect-host:8083/connectors/my-connector/pause"

That being, let's go ahead and run the application which will allow the connector to stream the table from the database.


Excellent! All my table records have been streamed. But it will not stop. Every 5 seconds I will receive the records again until I pause the connector.

Seems like we were able to try two of modes that JDBC provides. But there are also other modes such as:
  • Timestamp Mode: In timestamp mode, the connector uses a timestamp column to detect only new rows. It compares the timestamp of each row with the last known timestamp value to identify changes. This mode is useful when your database table has a reliable timestamp column that reflects data modifications.

  • Timestamp+Incrementing Mode: This mode combines both a timestamp column and an incrementing column. It provides more accurate tracking of changes by considering both the timestamp and incremental values. Use this mode when you want to capture both INSERT and UPDATE events.
It's important to try out these modes and also others to see which one fits your business needs and your technical requirements.

Comments

Popular posts

Google & Microsoft Token Validation Middleware in ASP.NET Applications

Publish and Consume Messages Using RabbitMQ as a Message Broker in 4 Simple Steps 🐰

Real Life Case Study: Synchronous Inter-Service Communication in Distributed Systems