Skip to content

Latest commit

 

History

History
executable file
·
211 lines (139 loc) · 4.58 KB

README.md

File metadata and controls

executable file
·
211 lines (139 loc) · 4.58 KB

image

Confluent + IBM Demo

This repository demonstrates how to integrate with IBM technologies (IBM MQ and DB2). Two connectors will be started up: Datagen source, to mock clickstream data and IBM MQ Connetor source. Then we'll use KSQL to join the two sources together. We'll also configure a IBM DB2 source connector to read data from DB2. The resut of the ksqlDB join will be sent to IBM MQ using a sink connector.

Download the demo

Using your terminal, Download the zip containing this confluentinc/demo-scene GitHub repository..

wget http://github.com/confluentinc/demo-scene/archive/master.zip

Then unzip the file and enter in the directory demo-scene-master/ibm-demo from your teminal. If you are using a Mac or similar commands should be:

unzip master.zip
cd demo-scene-master/ibm-demo

Make commands

This step will spin up the Confluent Platform cluster and the IBM DB2 and IBM MQ servers.

make build
make cluster
# wait a minute for cluster to spinup

Make the topics

With these commands we create the topics we need

make topic

Open the IBM MQ Dashboard

log in

UserName=admin
Password=passw0rd

Show AVRO schema in C3 topics

You need to send a message to IBM MQ before the schema will appear in the topic in C3.

  • Select DEV.QUEUE.1 under "Queues on MQ1"

ibmmq

  • Add a message

add image add image

Notice that the messages are not consumed yet...

Access Confluent Control Center

Access Confluent Control Center Here you can see your local Confluent cluster, and the topics created before.

Make the source connectors

Now we configure the connector so we can read data from IBM MQ

make connectsource
# wait a minute before moving on to the next step
  • You can now see the schema assigned to the ibmmq topic

ibmmq topic

AVRO message appear in consumer

Run the ibmmq consumer to see messages coming in from DEV.QUEUE.1 (or check in C3)

make consumer

You can also see in IBM MQ that the messages are not there anymore.

KSQL

Create the stream from the CLICKSTREAM topic with ksqlDB

In Confluent Control Center , Select the cluster tile, Click on ksqlDB on the left menu , and select the ksqldb1 cluster.

Using the editor run the queries below:

CREATE STREAM CLICKSTREAM
  WITH (KAFKA_TOPIC='clickstream',
        VALUE_FORMAT='AVRO');

Add another message to DEV.QUEUE.1

Send another message to IBM MQ. You can use the user names bobk_43 or akatz1022 to capture clickstreams for those users with a KSQL join.

Create the Stream for the IBMMQ topic

CREATE STREAM ibmmq
  WITH (KAFKA_TOPIC='ibmmq',
        VALUE_FORMAT='AVRO');

Click on Add query properties and select auto.offset.reset = Earliest

SELECT * FROM ibmmq
EMIT CHANGES;
SELECT "TEXT" FROM ibmmq
EMIT CHANGES;

JOIN the 2 streams

Paste the KSQL statement into the KSQL Editor to perform the join.

CREATE STREAM VIP_USERS AS
select  * from  CLICKSTREAM
join  IBMMQ WITHIN 5 seconds
on text = username emit changes;

join

SELECT * FROM VIP_USERS 
emit changes;

This query will return you values only if you added messages in IBM MQ that will match usernames in the CLICKSTREAM stream/topic (as instructed above).

Configure DB2

docker exec -ti ibmdb2 bash -c "su - db2inst1"
db2 connect to sample user db2inst1 using passw0rd
db2 LIST TABLES

You can now exit db2

exit

Now you can create the connector to load the data from db2

make connectdb2source

You will see that the connector automatically creates data in Confluent. Check in Confluent Control Center , under topics.

You can also see the connectors created by clicking on the Connect link in the left menu.

Sink data to IBM MQ

Let's sink the new stream data into IBM MQ into DEV.QUEUE.2

make connectsink

You can see the data by loggin in

UserName=admin
Password=passw0rd

Bring down the demo

When you are done with the demo execute the command:

make down

Troubleshooting tips

docker exec -ti ibmdb2 bash -c "su - db2inst1"
db2 get dbm cfg | grep "SVCENAME"

grep "db2c_db2inst1" /etc/services

db2level