Debezium DB2 Connector with Apache Kafka

Sairaj Alve
5 min readDec 21, 2020

--

Objective :

Deploy an End-to-End solution using Debezium DB2 connector with Apache Kafka, this would enable one to record CDC for tables in DB2 and stream them to a Kafka topic which can then be used by different applications.

Prerequisites :

  1. Apache Kafka Installed.
  2. Docker installed.
  3. Basic Knowledge of Functions and Procedures in PL/SQL

4. Working knowledge of DB2.

Steps :

Step 1: Clone the repository — https://github.com/debezium/debezium-connector-db2

Step 2: Setup DB2 Database

1. Build a DB2 database using the docker file provided in the repo under “src/test/docker/db2-cdc-docker”

2. Run the following command to build a Docker image, eg — db2test:0.1

docker build -t db2test:0.1 .

3. After the image is built start a docker container which initializes and runs the required scripts to start the asncap service which helps to capture databases changes.

docker run — name testdb2 — privileged=true -p 50000:50000 -e LICENSE=accept -e DB2INST1_PASSWORD=password -e DBNAME=testdb -v <your-dir>/db2inst1:/database db2tester:0.1

*Note — username will be db2inst1 by default.

4. Let the docker run complete and lookout in the logs for a message “Setup is complete”

5. Keep the docker run window open, or you can pass “-it” option to run it in the background.

6. Open a new terminal and execute the below command –

docker exec -ti mydb2 bash -c "su - ${DB2INSTANCE}"

In our case DB2INSTANCE will be db2inst1

7. The above command will login into the docker container.

8. Upon logging in you need to check if the anscap service is running. Run the below command

asncap capture_server=db capture_schema=ASNcapture_path=/home/db2inst/capture_files startmode=warmsi

*Note the capture_path value can be custom. You can create your own path and provide to the command.

If you get any errors related to archival logging, make sure you have its setup using the following command.

db2 update db cfg for <your-db-name> using LOGARCHMETH2 <value>

<value> — Path to capture logging data format

Example — DISK:/path to folder

9. To check if the service has started you can use one of the Stored Procedures which is installed with the docker run command above. Type “db2” on the CLI to enter the db2 prompt.

[db2inst1@37667f8e53d9 ~]$ db2

Then call the below stored procedure.

db2 => values asncdc.asncdcservices(‘status’,’asncdc’)

This will return logs for the asncap service

2020–12–15–18.38.22.867933 ASN0600I “AsnCcmd” : “” : “Initial” : Program “capcmd 11.4.0 (Build 11.5.0.0 Level s1906101300, PTF DYN1906101300AMD64)” is starting.

2020–12–15–18.38.24.976207 ASN0520I “AsnCcmd” : “ASNCDC” : “Initial” : The STATUS command response: “HoldLThread” thread is in the “is waiting” state.

2020–12–15–18.38.24.976342 ASN0520I “AsnCcmd” : “ASNCDC” : “Initial” : The STATUS command response: “AdminThread” thread is in the “is resting” state.

2020–12–15–18.38.24.976388 ASN0520I “AsnCcmd” : “ASNCDC” : “Initial” : The STATUS command response: “PruneThread” thread is in the “is resting” state.

2020–12–15–18.38.24.976423 ASN0520I “AsnCcmd” : “ASNCDC” : “Initial” : The STATUS command response: “WorkerThread” thread is in the “is doing work” state.

2020–12–15–18.38.24.976458 ASN0520I “AsnCcmd” : “ASNCDC” : “Initial” : The STATUS command response: “LogrdThread” thread is in the “is doing work” state.

“is doing work” -> States that the service is running fine.

10. Next select tables for the service to capture changes. For the same use the installed stored procedure as follows –

CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE');* Make sure the table and schema are already present.

After calling the procedure execute the below command to re-initialize the service –

VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc');The service will now capture the table MYSCHEMA.MYTABLEYou can also remove tables to be captured using the following –CALL ASNCDC.REMOVETABLE('MYSCHEMA', 'MYTABLE');And re-initialize the service using the command –VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc');

Next step would be to Setup the Kafka Instance.

Step 3: Setup Kafka

  1. Download Kafka and install the same following the QuickStart Guide

2. Ensure the following services are up and running -

  1. Kafka Bootstrap Server
  2. Kafka Zookeeper
  3. Kafka Connect

3. Now deploy the Debezium DB2 CDC source connector to pickup updates from the tables which are being captured in DB2 Docker container, which we have setup earlier.

Source connector config:

{"name": "db2-connector-test","config": {"connector.class": "io.debezium.connector.db2.Db2Connector","database.hostname": "localhost","database.port": "50000",  ->> sslConnection:true can be passed here."database.user": "","database.password": "","database.dbname": "TESTDB","table.whitelist": "MYSCHEMA.CDC_TEST_KEY","database.server.name": "localhost","snapshot.mode":"schema_only","database.history.kafka.bootstrap.servers": "localhost:9092","database.history.kafka.topic": "schema.localhost","transforms": "route","transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex": "localhost.MYSCHEMA.(.*)","transforms.route.replacement": "TEST.$1"}}

Deploy the same using command

curl -X POST -H "Content-Type: application/json" -d <source-connector>.json localhost:8083/connectors

Check the topics are generated as required with below command.

bin/kafka-topics.sh — list — zookeeper localhost:2181

As per our source connector config, topic name will be created in this format

“TEST.CDC_TEST_KEY”

You can view contents of the topic using this command

bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic TEST.CDC_TEST_KEY — from-beginning

Now deploy a sink connector which would read from the kafka topic and write to a DB2 database table.

Sink Connector config -

{"name": "db2-sink-connector-test","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "TEST.CDC_TEST_KEY","connection.url": "” -> Choose your db"connection.user": "”","connection.password": "","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false","auto.create": "true","insert.mode": "upsert","delete.enabled": "true","pk.fields": "ID", -> Make sure the source table has a PK field."pk.mode": "record_key"}}

Deploy the same using the previous command –

curl -X POST -H "Content-Type: application/json" -d <sink-connector>.json localhost:8083/connectors

Ensure both connectors are running without errors using the following command

curl -X GET localhost:8083/connectors/status/ db2-connector-test

curl -X GET localhost:8083/connectors/status/ db2-sink-connector-test

Once both are running fine, perform an insert in the Source table which is being captured, with every change committed in the table, it will be captured and sent to the Debezium connector and will be reflected in the destination table by the sink-connector.

Thus this flow demonstrates the implementation of the Debezium CDC connector for DB2, which provides an excellent way to capture changes from tables in DB2 database.

References:

  1. https://debezium.io/documentation/reference/connectors/db2.html
  2. https://kafka.apache.org/
  3. https://www.docker.com/

--

--

Sairaj Alve
Sairaj Alve

Written by Sairaj Alve

DevOps Developer at IBM. Loves to Automate Things!

Responses (2)