From a92ec38a9b7b9bc4930fa3203ae993a3ac3d89bd Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Fri, 24 Jan 2020 19:51:17 +0530 Subject: [PATCH 01/18] Add dependencies for mongo. --- pom.xml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pom.xml b/pom.xml index 276706f..df1e0be 100644 --- a/pom.xml +++ b/pom.xml @@ -78,6 +78,11 @@ debezium-connector-sqlserver ${debezium-connector-sqlserver.version} + + io.debezium + debezium-connector-mongodb + ${debezium-connector-mongodb.version} + com.zaxxer HikariCP @@ -129,6 +134,12 @@ ${h2.connector.version} test + + org.mongodb + mongo-java-driver + ${mongodb.connector.version} + test + org.apache.log4j.wso2 log4j @@ -223,6 +234,7 @@ 0.10.0.Final 0.10.0.Final 0.10.0.Final + 0.10.0.Final 12.1.0.2 3.2.0 6.11 @@ -243,6 +255,7 @@ 0.0.0.0 12.2.0.1 2.3.0 + 3.4.0 https://github.com/siddhi-io/siddhi-io-cdc.git From f9c49a6be0fe267fd5a88acbb9eb9b446bada5eb Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Tue, 28 Jan 2020 23:16:30 +0530 Subject: [PATCH 02/18] Support mongodb for cdc. --- component/pom.xml | 4 + .../io/cdc/util/CDCSourceConstants.java | 6 ++ .../extension/io/cdc/util/CDCSourceUtil.java | 77 +++++++++++++------ 3 files changed, 64 insertions(+), 23 deletions(-) diff --git a/component/pom.xml b/component/pom.xml index 99c25ed..82baf98 100644 --- a/component/pom.xml +++ b/component/pom.xml @@ -51,6 +51,10 @@ io.debezium debezium-connector-postgres + + io.debezium + debezium-connector-mongodb + io.siddhi siddhi-core diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java index 0b31217..ffe52da 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java @@ -52,6 +52,7 @@ public class CDCSourceConstants { public static final String POSTGRESQL_CONNECTOR_CLASS = "io.debezium.connector.postgresql.PostgresConnector"; public static final String ORACLE_CONNECTOR_CLASS = "io.debezium.connector.oracle.OracleConnector"; public static final String SQLSERVER_CONNECTOR_CLASS = "io.debezium.connector.sqlserver.SqlServerConnector"; + public static final String MONGODB_CONNECTOR_CLASS = "io.debezium.connector.mongodb.MongoDbConnector"; public static final String BEFORE_PREFIX = "before_"; public static final String CACHE_OBJECT = "cacheObj"; public static final int DEFAULT_SERVER_ID = -1; @@ -76,4 +77,9 @@ public class CDCSourceConstants { public static final String WAIT_ON_MISSED_RECORD = "wait.on.missed.record"; public static final String MISSED_RECORD_WAITING_TIMEOUT = "missed.record.waiting.timeout"; public static final String CONNECTOR_NAME = "name"; + public static final String MONGODB_USER = "mongodb.user"; + public static final String MONGODB_PASSWORD = "mongodb.password"; + public static final String MONGODB_HOSTS = "mongodb.hosts"; + public static final String MONGODB_NAME = "mongodb.name"; + public static final String MONGODB_COLLECTION_WHITELIST = "collection.whitelist"; } diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java index a924bcd..dc800f6 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java @@ -40,6 +40,7 @@ public static Map getConfigMap(String username, String password, Map configMap = new HashMap<>(); String host; int port; + boolean mongodb = false; String database; //Add schema specific details to configMap @@ -158,41 +159,71 @@ public static Map getConfigMap(String username, String password, configMap.put(CDCSourceConstants.CONNECTOR_CLASS, CDCSourceConstants.ORACLE_CONNECTOR_CLASS); break; } + case "mongodb": { + mongodb = true; + //Extract url details + String regex = "jdbc:mongodb://(\\w*|[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}):" + + "(\\d++)/(\\w*)"; + Pattern p = Pattern.compile(regex); + Matcher matcher = p.matcher(url); + if (matcher.find()) { + host = matcher.group(1); + port = Integer.parseInt(matcher.group(2)); + database = matcher.group(3); + } else { + throw new WrongConfigurationException("Invalid JDBC url: " + url + + " received for stream: " + siddhiStreamName + + ". Expected url format: jdbc:mongodb://:/"); + } + + configMap.put(CDCSourceConstants.CONNECTOR_CLASS, CDCSourceConstants.MONGODB_CONNECTOR_CLASS); + configMap.put(CDCSourceConstants.MONGODB_USER, username); + configMap.put(CDCSourceConstants.MONGODB_PASSWORD, password); + //hostname and port pairs of the MongoDB servers in the replica set. + configMap.put(CDCSourceConstants.MONGODB_HOSTS, "mongo01/127.0.0.1:27017"); + //unique name that identifies the connector and/or MongoDB replica set or sharded cluster + configMap.put(CDCSourceConstants.MONGODB_NAME, "mongo01"); + //fully-qualified namespaces for MongoDB collections to be monitored + configMap.put(CDCSourceConstants.MONGODB_COLLECTION_WHITELIST, database + "." + tableName); + break; + } default: throw new WrongConfigurationException("Unsupported schema. Expected schema: mysql or postgresql" + - "or sqlserver or oracle, Found: " + splittedURL[1]); + "or sqlserver, oracle or mongodb Found: " + splittedURL[1]); } - //Add general config details to configMap - configMap.put(CDCSourceConstants.DATABASE_USER, username); - configMap.put(CDCSourceConstants.DATABASE_PASSWORD, password); + if(!mongodb) { + //Add general config details to configMap + configMap.put(CDCSourceConstants.DATABASE_USER, username); + configMap.put(CDCSourceConstants.DATABASE_PASSWORD, password); - if (serverID == CDCSourceConstants.DEFAULT_SERVER_ID) { - Random random = new Random(); - configMap.put(CDCSourceConstants.SERVER_ID, random.nextInt(1001) + 5400); - } else { - configMap.put(CDCSourceConstants.SERVER_ID, serverID); - } + if (serverID == CDCSourceConstants.DEFAULT_SERVER_ID) { + Random random = new Random(); + configMap.put(CDCSourceConstants.SERVER_ID, random.nextInt(1001) + 5400); + } else { + configMap.put(CDCSourceConstants.SERVER_ID, serverID); + } + + //set the database server name if specified, otherwise set _ as default + if (serverName.equals("")) { + configMap.put(CDCSourceConstants.DATABASE_SERVER_NAME, host + "_" + port); + } else { + configMap.put(CDCSourceConstants.DATABASE_SERVER_NAME, serverName); + } - //set the database server name if specified, otherwise set _ as default - if (serverName.equals("")) { - configMap.put(CDCSourceConstants.DATABASE_SERVER_NAME, host + "_" + port); - } else { - configMap.put(CDCSourceConstants.DATABASE_SERVER_NAME, serverName); + //set history file path. + configMap.put(CDCSourceConstants.DATABASE_HISTORY, CDCSourceConstants.DATABASE_HISTORY_FILEBASE_HISTORY); + configMap.put(CDCSourceConstants.DATABASE_HISTORY_FILE_NAME, + historyFileDirectory + siddhiStreamName + ".dat"); } + //set connector property: name + configMap.put(CDCSourceConstants.CONNECTOR_NAME, siddhiAppName + siddhiStreamName); + configMap.put(CDCSourceConstants.OFFSET_STORAGE, InMemoryOffsetBackingStore.class.getName()); configMap.put(CDCSourceConstants.CDC_SOURCE_OBJECT, cdcSourceHashCode); - //set history file path. - configMap.put(CDCSourceConstants.DATABASE_HISTORY, CDCSourceConstants.DATABASE_HISTORY_FILEBASE_HISTORY); - configMap.put(CDCSourceConstants.DATABASE_HISTORY_FILE_NAME, - historyFileDirectory + siddhiStreamName + ".dat"); - - //set connector property: name - configMap.put("name", siddhiAppName + siddhiStreamName); - //set additional connector properties using comma separated key value pair string for (Map.Entry entry : getConnectorPropertiesMap(connectorProperties).entrySet()) { configMap.put(entry.getKey(), entry.getValue()); From ba902a9c328178bce13d75a89a12d796cd790132 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Fri, 31 Jan 2020 12:16:38 +0530 Subject: [PATCH 03/18] Capture insert operation. --- component/pom.xml | 6 ++++ .../source/listening/ChangeDataCapture.java | 36 ++++++++++++++++--- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/component/pom.xml b/component/pom.xml index 82baf98..0f5e16f 100644 --- a/component/pom.xml +++ b/component/pom.xml @@ -27,6 +27,11 @@ bundle Siddhi IO CDC extension + + org.json + json + 20190722 + org.apache.log4j.wso2 log4j @@ -566,6 +571,7 @@ org.apache.kafka.connect.*, org.apache.kafka.common.*, org.apache.kafka.clients.*, + org.antlr.*, io.siddhi.annotation.*;version="${siddhi.import.version.range}", diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java index a7e93e3..d104d68 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java @@ -29,15 +29,19 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; +import org.json.JSONObject; import java.math.BigDecimal; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import static java.util.Objects.isNull; + /** * This class is for capturing change data using debezium embedded engine. **/ @@ -162,11 +166,33 @@ private Map createMap(ConnectRecord connectRecord, String operat switch (op) { case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION: //append row details after insert. - rawDetails = (Struct) record.get(CDCSourceConstants.AFTER); - fields = rawDetails.schema().fields(); - for (Field key : fields) { - fieldName = key.name(); - detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); + if (record.get(CDCSourceConstants.AFTER) instanceof String) { + String insertString = (String) record.get(CDCSourceConstants.AFTER); + JSONObject jsonObj = new JSONObject(insertString); + Iterator keys = jsonObj.keys(); + for (Iterator it = keys; it.hasNext(); ) { + String key = it.next(); + key.getClass().getTypeName(); + if (jsonObj.get(key) instanceof Integer || jsonObj.get(key) instanceof Integer) { + detailsMap.put(key, getValue(jsonObj.getLong(key))); + } else if (jsonObj.get(key) instanceof Float || jsonObj.get(key) instanceof Double) { + detailsMap.put(key, getValue(jsonObj.getDouble(key))); + } else if (jsonObj.get(key) instanceof String) { + detailsMap.put(key, getValue(jsonObj.getString(key))); + } else if (isNull(jsonObj.get(key))) { + detailsMap.put(key, null); + } +// else if (jsonObj.get(key) != null) { +// detailsMap.put(key, getValue(jsonObj.getJSONObject(key))); +// } + } + } else { + rawDetails = (Struct) record.get(CDCSourceConstants.AFTER); + fields = rawDetails.schema().fields(); + for (Field key : fields) { + fieldName = key.name(); + detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); + } } break; case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION: From 8fee6a27d47cbc280d04fb17dbabce2f01d0ffe5 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Sat, 1 Feb 2020 12:18:15 +0530 Subject: [PATCH 04/18] Fix issues in insert operation. --- .../source/listening/ChangeDataCapture.java | 82 ++++++++++--------- .../extension/io/cdc/util/CDCSourceUtil.java | 66 +++++++-------- 2 files changed, 75 insertions(+), 73 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java index d104d68..4f1ec21 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; +import org.apache.log4j.Logger; import org.json.JSONObject; import java.math.BigDecimal; @@ -40,12 +41,11 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import static java.util.Objects.isNull; - /** * This class is for capturing change data using debezium embedded engine. **/ public class ChangeDataCapture { + private static final Logger log = Logger.getLogger(ChangeDataCapture.class); private String operation; private Configuration config; @@ -166,60 +166,66 @@ private Map createMap(ConnectRecord connectRecord, String operat switch (op) { case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION: //append row details after insert. - if (record.get(CDCSourceConstants.AFTER) instanceof String) { + try { + rawDetails = (Struct) record.get(CDCSourceConstants.AFTER); + fields = rawDetails.schema().fields(); + for (Field key : fields) { + fieldName = key.name(); + detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); + } + } catch (ClassCastException ex) { String insertString = (String) record.get(CDCSourceConstants.AFTER); JSONObject jsonObj = new JSONObject(insertString); Iterator keys = jsonObj.keys(); for (Iterator it = keys; it.hasNext(); ) { String key = it.next(); - key.getClass().getTypeName(); - if (jsonObj.get(key) instanceof Integer || jsonObj.get(key) instanceof Integer) { - detailsMap.put(key, getValue(jsonObj.getLong(key))); - } else if (jsonObj.get(key) instanceof Float || jsonObj.get(key) instanceof Double) { + if (jsonObj.get(key) instanceof Boolean) { + detailsMap.put(key, getValue(jsonObj.getBoolean(key))); + } else if (jsonObj.get(key) instanceof Double) { + log.info("INTEGER, LONG, FLOAT and DOUBLE values are returned as DOUBLE."); detailsMap.put(key, getValue(jsonObj.getDouble(key))); } else if (jsonObj.get(key) instanceof String) { detailsMap.put(key, getValue(jsonObj.getString(key))); - } else if (isNull(jsonObj.get(key))) { - detailsMap.put(key, null); + } else if (jsonObj.get(key) instanceof JSONObject ) { + detailsMap.put(key, getValue(jsonObj.getJSONObject(key))); } -// else if (jsonObj.get(key) != null) { -// detailsMap.put(key, getValue(jsonObj.getJSONObject(key))); -// } - } - } else { - rawDetails = (Struct) record.get(CDCSourceConstants.AFTER); - fields = rawDetails.schema().fields(); - for (Field key : fields) { - fieldName = key.name(); - detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); } } break; case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION: //append row details before delete. - rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE); - fields = rawDetails.schema().fields(); - for (Field key : fields) { - fieldName = key.name(); - detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName, - getValue(rawDetails.get(fieldName))); + try { + rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE); + fields = rawDetails.schema().fields(); + for (Field key : fields) { + fieldName = key.name(); + detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName, + getValue(rawDetails.get(fieldName))); + } + } catch (DataException ex) { + log.info("Delete operation is not supported for MongoDB."); } + break; case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION: //append row details before update. - rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE); - fields = rawDetails.schema().fields(); - for (Field key : fields) { - fieldName = key.name(); - detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName, - getValue(rawDetails.get(fieldName))); - } - //append row details after update. - rawDetails = (Struct) record.get(CDCSourceConstants.AFTER); - fields = rawDetails.schema().fields(); - for (Field key : fields) { - fieldName = key.name(); - detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); + try { + rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE); + fields = rawDetails.schema().fields(); + for (Field key : fields) { + fieldName = key.name(); + detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName, + getValue(rawDetails.get(fieldName))); + } + //append row details after update. + rawDetails = (Struct) record.get(CDCSourceConstants.AFTER); + fields = rawDetails.schema().fields(); + for (Field key : fields) { + fieldName = key.name(); + detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); + } + } catch (DataException ex) { + log.info("Update operation is not supported for MongoDB."); } break; } diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java index dc800f6..a2f541b 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java @@ -40,7 +40,6 @@ public static Map getConfigMap(String username, String password, Map configMap = new HashMap<>(); String host; int port; - boolean mongodb = false; String database; //Add schema specific details to configMap @@ -160,29 +159,28 @@ public static Map getConfigMap(String username, String password, break; } case "mongodb": { - mongodb = true; //Extract url details - String regex = "jdbc:mongodb://(\\w*|[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}):" + + String regex = "jdbc:mongodb://(\\w*|(\\w*)/[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}):" + "(\\d++)/(\\w*)"; Pattern p = Pattern.compile(regex); Matcher matcher = p.matcher(url); + String replicaSetName; if (matcher.find()) { host = matcher.group(1); - port = Integer.parseInt(matcher.group(2)); - database = matcher.group(3); + replicaSetName = matcher.group(2); + port = Integer.parseInt(matcher.group(3)); + database = matcher.group(4); } else { throw new WrongConfigurationException("Invalid JDBC url: " + url + " received for stream: " + siddhiStreamName + - ". Expected url format: jdbc:mongodb://:/"); + ". Expected url format: jdbc:mongodb:///:/" + + ""); } - configMap.put(CDCSourceConstants.CONNECTOR_CLASS, CDCSourceConstants.MONGODB_CONNECTOR_CLASS); - configMap.put(CDCSourceConstants.MONGODB_USER, username); - configMap.put(CDCSourceConstants.MONGODB_PASSWORD, password); //hostname and port pairs of the MongoDB servers in the replica set. - configMap.put(CDCSourceConstants.MONGODB_HOSTS, "mongo01/127.0.0.1:27017"); + configMap.put(CDCSourceConstants.MONGODB_HOSTS, host + ":" + port); //unique name that identifies the connector and/or MongoDB replica set or sharded cluster - configMap.put(CDCSourceConstants.MONGODB_NAME, "mongo01"); + configMap.put(CDCSourceConstants.MONGODB_NAME, replicaSetName); //fully-qualified namespaces for MongoDB collections to be monitored configMap.put(CDCSourceConstants.MONGODB_COLLECTION_WHITELIST, database + "." + tableName); break; @@ -193,37 +191,35 @@ public static Map getConfigMap(String username, String password, } - if(!mongodb) { - //Add general config details to configMap - configMap.put(CDCSourceConstants.DATABASE_USER, username); - configMap.put(CDCSourceConstants.DATABASE_PASSWORD, password); - - if (serverID == CDCSourceConstants.DEFAULT_SERVER_ID) { - Random random = new Random(); - configMap.put(CDCSourceConstants.SERVER_ID, random.nextInt(1001) + 5400); - } else { - configMap.put(CDCSourceConstants.SERVER_ID, serverID); - } - - //set the database server name if specified, otherwise set _ as default - if (serverName.equals("")) { - configMap.put(CDCSourceConstants.DATABASE_SERVER_NAME, host + "_" + port); - } else { - configMap.put(CDCSourceConstants.DATABASE_SERVER_NAME, serverName); - } + //Add general config details to configMap + configMap.put(CDCSourceConstants.DATABASE_USER, username); + configMap.put(CDCSourceConstants.DATABASE_PASSWORD, password); - //set history file path. - configMap.put(CDCSourceConstants.DATABASE_HISTORY, CDCSourceConstants.DATABASE_HISTORY_FILEBASE_HISTORY); - configMap.put(CDCSourceConstants.DATABASE_HISTORY_FILE_NAME, - historyFileDirectory + siddhiStreamName + ".dat"); + if (serverID == CDCSourceConstants.DEFAULT_SERVER_ID) { + Random random = new Random(); + configMap.put(CDCSourceConstants.SERVER_ID, random.nextInt(1001) + 5400); + } else { + configMap.put(CDCSourceConstants.SERVER_ID, serverID); } - //set connector property: name - configMap.put(CDCSourceConstants.CONNECTOR_NAME, siddhiAppName + siddhiStreamName); + //set the database server name if specified, otherwise set _ as default + if (serverName.equals("")) { + configMap.put(CDCSourceConstants.DATABASE_SERVER_NAME, host + "_" + port); + } else { + configMap.put(CDCSourceConstants.DATABASE_SERVER_NAME, serverName); + } configMap.put(CDCSourceConstants.OFFSET_STORAGE, InMemoryOffsetBackingStore.class.getName()); configMap.put(CDCSourceConstants.CDC_SOURCE_OBJECT, cdcSourceHashCode); + //set history file path. + configMap.put(CDCSourceConstants.DATABASE_HISTORY, CDCSourceConstants.DATABASE_HISTORY_FILEBASE_HISTORY); + configMap.put(CDCSourceConstants.DATABASE_HISTORY_FILE_NAME, + historyFileDirectory + siddhiStreamName + ".dat"); + + //set connector property: name + configMap.put(CDCSourceConstants.CONNECTOR_NAME, siddhiAppName + siddhiStreamName); + //set additional connector properties using comma separated key value pair string for (Map.Entry entry : getConnectorPropertiesMap(connectorProperties).entrySet()) { configMap.put(entry.getKey(), entry.getValue()); From 947f332b5e1a5d6fa55eea474e10aa5d1f67eb3e Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Sun, 2 Feb 2020 12:56:06 +0530 Subject: [PATCH 05/18] Log delete and update operations. --- .../source/listening/ChangeDataCapture.java | 25 +++++++++++++------ .../extension/io/cdc/util/CDCSourceUtil.java | 11 ++++++-- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java index 4f1ec21..0aeade7 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java @@ -30,6 +30,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; import org.apache.log4j.Logger; +import org.json.JSONException; import org.json.JSONObject; import java.math.BigDecimal; @@ -180,14 +181,20 @@ private Map createMap(ConnectRecord connectRecord, String operat for (Iterator it = keys; it.hasNext(); ) { String key = it.next(); if (jsonObj.get(key) instanceof Boolean) { - detailsMap.put(key, getValue(jsonObj.getBoolean(key))); + detailsMap.put(key, jsonObj.getBoolean(key)); + } else if (jsonObj.get(key) instanceof Integer) { + detailsMap.put(key, jsonObj.getInt(key)); } else if (jsonObj.get(key) instanceof Double) { - log.info("INTEGER, LONG, FLOAT and DOUBLE values are returned as DOUBLE."); - detailsMap.put(key, getValue(jsonObj.getDouble(key))); + detailsMap.put(key, jsonObj.getDouble(key)); } else if (jsonObj.get(key) instanceof String) { - detailsMap.put(key, getValue(jsonObj.getString(key))); - } else if (jsonObj.get(key) instanceof JSONObject ) { - detailsMap.put(key, getValue(jsonObj.getJSONObject(key))); + detailsMap.put(key, jsonObj.getString(key)); + } else if (jsonObj.get(key) instanceof JSONObject) { + try { + detailsMap.put(key, Long.parseLong((String) jsonObj.getJSONObject(key). + get("$numberLong"))); + } catch (JSONException e) { + detailsMap.put(key, jsonObj.getJSONObject(key)); + } } } } @@ -203,7 +210,7 @@ private Map createMap(ConnectRecord connectRecord, String operat getValue(rawDetails.get(fieldName))); } } catch (DataException ex) { - log.info("Delete operation is not supported for MongoDB."); + log.info("Delete record with id : " + connectRecord.key().toString()); } break; @@ -225,7 +232,9 @@ private Map createMap(ConnectRecord connectRecord, String operat detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); } } catch (DataException ex) { - log.info("Update operation is not supported for MongoDB."); + log.info("Update record id : " + connectRecord.key().toString() + + ", fields : " + record.getString("patch")); + } break; } diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java index a2f541b..e3ee8a2 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java @@ -41,6 +41,7 @@ public static Map getConfigMap(String username, String password, String host; int port; String database; + Boolean isMongodb = false; //Add schema specific details to configMap String[] splittedURL = url.split(":"); @@ -160,6 +161,7 @@ public static Map getConfigMap(String username, String password, } case "mongodb": { //Extract url details + isMongodb = true; String regex = "jdbc:mongodb://(\\w*|(\\w*)/[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}):" + "(\\d++)/(\\w*)"; Pattern p = Pattern.compile(regex); @@ -192,8 +194,13 @@ public static Map getConfigMap(String username, String password, } //Add general config details to configMap - configMap.put(CDCSourceConstants.DATABASE_USER, username); - configMap.put(CDCSourceConstants.DATABASE_PASSWORD, password); + if (!isMongodb) { + configMap.put(CDCSourceConstants.DATABASE_USER, username); + configMap.put(CDCSourceConstants.DATABASE_PASSWORD, password); + } else { + configMap.put(CDCSourceConstants.MONGODB_USER, username); + configMap.put(CDCSourceConstants.MONGODB_PASSWORD, password); + } if (serverID == CDCSourceConstants.DEFAULT_SERVER_ID) { Random random = new Random(); From ce5c69403369e2ec421ab6dcd0e1cc727888c42c Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Sun, 2 Feb 2020 18:31:13 +0530 Subject: [PATCH 06/18] Log existing documents in the collection on start. --- .../io/cdc/source/listening/ChangeDataCapture.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java index 0aeade7..27126f6 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java @@ -148,6 +148,9 @@ private Map createMap(ConnectRecord connectRecord, String operat try { op = (String) record.get("op"); + if ("r".equals(op)){ + op = CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION; + } } catch (NullPointerException | DataException ex) { return detailsMap; } @@ -192,8 +195,13 @@ private Map createMap(ConnectRecord connectRecord, String operat try { detailsMap.put(key, Long.parseLong((String) jsonObj.getJSONObject(key). get("$numberLong"))); - } catch (JSONException e) { - detailsMap.put(key, jsonObj.getJSONObject(key)); + } catch (JSONException e1) { + try { + detailsMap.put(key, Double.parseDouble((String) jsonObj.getJSONObject(key). + get("$numberDecimal"))); + } catch (JSONException e2){ + detailsMap.put(key, jsonObj.getJSONObject(key)); + } } } } @@ -212,7 +220,6 @@ private Map createMap(ConnectRecord connectRecord, String operat } catch (DataException ex) { log.info("Delete record with id : " + connectRecord.key().toString()); } - break; case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION: //append row details before update. @@ -234,7 +241,6 @@ private Map createMap(ConnectRecord connectRecord, String operat } catch (DataException ex) { log.info("Update record id : " + connectRecord.key().toString() + ", fields : " + record.getString("patch")); - } break; } From 1b60e20320e4b82e9754e5f5dfdf50d68604ebf2 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Mon, 3 Feb 2020 17:20:00 +0530 Subject: [PATCH 07/18] Add test cases. --- component/pom.xml | 5 + .../source/listening/ChangeDataCapture.java | 74 +++-- .../TestCaseOfCDCListeningModeMongo.java | 309 ++++++++++++++++++ pom.xml | 7 + 4 files changed, 366 insertions(+), 29 deletions(-) create mode 100644 component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java diff --git a/component/pom.xml b/component/pom.xml index 0f5e16f..197995c 100644 --- a/component/pom.xml +++ b/component/pom.xml @@ -95,6 +95,11 @@ siddhi-store-rdbms test + + io.siddhi.extension.store.mongodb + siddhi-store-mongodb + test + com.zaxxer HikariCP diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java index 27126f6..a51476d 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java @@ -148,7 +148,7 @@ private Map createMap(ConnectRecord connectRecord, String operat try { op = (String) record.get("op"); - if ("r".equals(op)){ + if ("r".equals(op)) { op = CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION; } } catch (NullPointerException | DataException ex) { @@ -180,31 +180,7 @@ private Map createMap(ConnectRecord connectRecord, String operat } catch (ClassCastException ex) { String insertString = (String) record.get(CDCSourceConstants.AFTER); JSONObject jsonObj = new JSONObject(insertString); - Iterator keys = jsonObj.keys(); - for (Iterator it = keys; it.hasNext(); ) { - String key = it.next(); - if (jsonObj.get(key) instanceof Boolean) { - detailsMap.put(key, jsonObj.getBoolean(key)); - } else if (jsonObj.get(key) instanceof Integer) { - detailsMap.put(key, jsonObj.getInt(key)); - } else if (jsonObj.get(key) instanceof Double) { - detailsMap.put(key, jsonObj.getDouble(key)); - } else if (jsonObj.get(key) instanceof String) { - detailsMap.put(key, jsonObj.getString(key)); - } else if (jsonObj.get(key) instanceof JSONObject) { - try { - detailsMap.put(key, Long.parseLong((String) jsonObj.getJSONObject(key). - get("$numberLong"))); - } catch (JSONException e1) { - try { - detailsMap.put(key, Double.parseDouble((String) jsonObj.getJSONObject(key). - get("$numberDecimal"))); - } catch (JSONException e2){ - detailsMap.put(key, jsonObj.getJSONObject(key)); - } - } - } - } + detailsMap = getMongoDetailMap(jsonObj); } break; case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION: @@ -218,7 +194,15 @@ private Map createMap(ConnectRecord connectRecord, String operat getValue(rawDetails.get(fieldName))); } } catch (DataException ex) { - log.info("Delete record with id : " + connectRecord.key().toString()); + String deleteDocument = (String) ((Struct)connectRecord.key()).get("id"); + JSONObject jsonObj = new JSONObject(deleteDocument); + Iterator keys = jsonObj.keys(); + while(keys.hasNext()) { + String key = keys.next(); + if (jsonObj.get(key) instanceof String) { + detailsMap.put("id", jsonObj.get(key)); + } + } } break; case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION: @@ -239,8 +223,10 @@ private Map createMap(ConnectRecord connectRecord, String operat detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); } } catch (DataException ex) { - log.info("Update record id : " + connectRecord.key().toString() + - ", fields : " + record.getString("patch")); + String updateDocument = (String) record.get("patch"); + JSONObject jsonObj = new JSONObject(updateDocument); + JSONObject setJsonObj = (JSONObject) jsonObj.get("$set"); + detailsMap = getMongoDetailMap(setJsonObj); } break; } @@ -248,6 +234,36 @@ private Map createMap(ConnectRecord connectRecord, String operat return detailsMap; } + private Map getMongoDetailMap(JSONObject jsonObj){ + Map detailsMap = new HashMap<>(); + Iterator keys = jsonObj.keys(); + for (Iterator it = keys; it.hasNext(); ) { + String key = it.next(); + if (jsonObj.get(key) instanceof Boolean) { + detailsMap.put(key, jsonObj.getBoolean(key)); + } else if (jsonObj.get(key) instanceof Integer) { + detailsMap.put(key, jsonObj.getInt(key)); + } else if (jsonObj.get(key) instanceof Double) { + detailsMap.put(key, jsonObj.getDouble(key)); + } else if (jsonObj.get(key) instanceof String) { + detailsMap.put(key, jsonObj.getString(key)); + } else if (jsonObj.get(key) instanceof JSONObject) { + try { + detailsMap.put(key, Long.parseLong((String) jsonObj.getJSONObject(key). + get("$numberLong"))); + } catch (JSONException e1) { + try { + detailsMap.put(key, Double.parseDouble((String) jsonObj.getJSONObject(key). + get("$numberDecimal"))); + } catch (JSONException e2) { + detailsMap.put(key, jsonObj.getJSONObject(key).toString()); + } + } + } + } + return detailsMap; + } + private Object getValue(Object v) { if (v instanceof Struct) { Optional value = VariableScaleDecimal.toLogical((Struct) v).getDecimalValue(); diff --git a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java new file mode 100644 index 0000000..54a0b80 --- /dev/null +++ b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java @@ -0,0 +1,309 @@ +package io.siddhi.extension.io.cdc.source; + +import io.siddhi.core.SiddhiAppRuntime; +import io.siddhi.core.SiddhiManager; +import io.siddhi.core.event.Event; +import io.siddhi.core.query.output.callback.QueryCallback; +import io.siddhi.core.stream.input.InputHandler; +import io.siddhi.core.stream.output.StreamCallback; +import io.siddhi.core.util.SiddhiTestHelper; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestCaseOfCDCListeningModeMongo { + + private static final Logger log = Logger.getLogger(TestCaseOfCDCListeningModeMongo.class); + private Event currentEvent; + private AtomicInteger eventCount = new AtomicInteger(0); + private AtomicBoolean eventArrived = new AtomicBoolean(false); + private int waitTime = 200; + private int timeout = 10000; + private String username; + private String password; + private String databaseURL; + private String replicaSetUrl; + private String tableName = "SweetProductionTable"; + + @BeforeClass + public void initializeConnectionParams() { + databaseURL = "mongodb://127.0.0.1:27017/production"; + replicaSetUrl = "jdbc:mongodb://mongo01/127.0.0.1:27017/production"; + username = System.getenv("DATABASE_USER"); + password = System.getenv("DATABASE_PASSWORD"); + } + + @BeforeMethod + public void init() { + eventCount.set(0); + eventArrived.set(false); + currentEvent = new Event(); + } + + /** + * Test case to Capture Insert operations from a Mongo table. + */ + @Test + public void testInsertCDC() throws InterruptedException { + log.info("------------------------------------------------------------------------------------------------"); + log.info("CDC TestCase: Capturing Insert change data from Mongo."); + log.info("------------------------------------------------------------------------------------------------"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String cdcinStreamDefinition = "@app:name('cdcTesting')" + + "@source(type = 'cdc'," + + " url = '" + replicaSetUrl + "'," + + " username = '" + username + "'," + + " password = '" + password + "'," + + " table.name = '" + tableName + "', " + + " operation = 'insert', " + + " @map(type='keyvalue'))" + + "define stream istm (name string, amount double);"; + + String mongoStoreDefinition = "define stream insertionStream (name string, amount double);" + + "@Store(type='mongodb', mongodb.uri='" + databaseURL + "')" + + "define table SweetProductionTable (name string, amount double);"; + + String mongoQuery = "@info(name='query2') " + + "from insertionStream " + + "insert into SweetProductionTable;"; + + SiddhiAppRuntime cdcAppRuntime = siddhiManager.createSiddhiAppRuntime(cdcinStreamDefinition); + + StreamCallback insertionStreamCallback = new StreamCallback() { + @Override + public void receive(Event[] events) { + for (Event event : events) { + currentEvent = event; + eventCount.getAndIncrement(); + log.info(eventCount + ". " + event); + eventArrived.set(true); + } + } + }; + + cdcAppRuntime.addCallback("istm", insertionStreamCallback); + cdcAppRuntime.start(); + + QueryCallback rdbmsQueryCallback = new QueryCallback() { + @Override + public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + log.info("insert done: " + event); + } + } + }; + + SiddhiAppRuntime mongoAppRuntime = siddhiManager.createSiddhiAppRuntime(mongoStoreDefinition + mongoQuery); + mongoAppRuntime.addCallback("query2", rdbmsQueryCallback); + mongoAppRuntime.start(); + + //Do an insert and wait for cdc app to capture. + InputHandler rdbmsInputHandler = mongoAppRuntime.getInputHandler("insertionStream"); + Object[] insertingObject = new Object[]{"e001", 100.00}; + rdbmsInputHandler.send(insertingObject); + SiddhiTestHelper.waitForEvents(waitTime, 1, eventCount, timeout); + + //Assert event arrival. + Assert.assertTrue(eventArrived.get()); + + //Assert event data. + Assert.assertEquals(insertingObject, currentEvent.getData()); + + cdcAppRuntime.shutdown(); + mongoAppRuntime.shutdown(); + siddhiManager.shutdown(); + } + + /** + * Test case to Capture Delete operations from a Mongo table. + */ + @Test + public void testDeleteCDC() throws InterruptedException { + log.info("------------------------------------------------------------------------------------------------"); + log.info("CDC TestCase: Capturing Delete change data from Mongo."); + log.info("------------------------------------------------------------------------------------------------"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String cdcinStreamDefinition = "@app:name('cdcTesting')" + + "@source(type = 'cdc'," + + " url = '" + replicaSetUrl + "'," + + " username = '" + username + "'," + + " password = '" + password + "'," + + " table.name = '" + tableName + "', " + + " operation = 'delete', " + + " @map(type='keyvalue'))" + + "define stream delstm (id string);"; + + + String mongoStoreDefinition = "define stream DeletionStream (name string, amount double);" + + "define stream InsertStream(name string, amount double);" + + "@Store(type='mongodb', mongodb.uri='" + databaseURL + "')" + + "define table SweetProductionTable (name string, amount double);"; + + String insertQuery = "@info(name='query3') " + + "from InsertStream " + + "insert into SweetProductionTable;"; + + String deleteQuery = "@info(name='queryDel') " + + "from DeletionStream " + + "delete SweetProductionTable on SweetProductionTable.name==name;"; + + SiddhiAppRuntime cdcAppRuntime = siddhiManager.createSiddhiAppRuntime(cdcinStreamDefinition); + + StreamCallback deletionStreamCallback = new StreamCallback() { + @Override + public void receive(Event[] events) { + for (Event event : events) { + currentEvent = event; + eventCount.getAndIncrement(); + log.info(eventCount + ". " + event); + eventArrived.set(true); + } + } + }; + + cdcAppRuntime.addCallback("delstm", deletionStreamCallback); + cdcAppRuntime.start(); + + QueryCallback mongoQuerycallback = new QueryCallback() { + @Override + public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + eventCount.getAndIncrement(); + log.info("delete done: " + event); + } + } + }; + + SiddhiAppRuntime mongoAppRuntime = siddhiManager.createSiddhiAppRuntime(mongoStoreDefinition + insertQuery + + deleteQuery); + mongoAppRuntime.addCallback("queryDel", mongoQuerycallback); + mongoAppRuntime.start(); + + //Do an insert first. + InputHandler mongoInputHandler = mongoAppRuntime.getInputHandler("InsertStream"); + Object[] insertingObject = new Object[]{"e001", 100.00}; + mongoInputHandler.send(insertingObject); + + //wait to complete deletion + SiddhiTestHelper.waitForEvents(waitTime, 1, eventCount, timeout); + eventCount.getAndDecrement(); + + //Delete inserted row + mongoInputHandler = mongoAppRuntime.getInputHandler("DeletionStream"); + Object[] deletingObject = new Object[]{"e001"}; + mongoInputHandler.send(deletingObject); + + //wait to capture the delete event. + SiddhiTestHelper.waitForEvents(waitTime, 1, eventCount, timeout); + + //Assert event arrival. + Assert.assertTrue(eventArrived.get()); + + cdcAppRuntime.shutdown(); + mongoAppRuntime.shutdown(); + siddhiManager.shutdown(); + } + + /** + * Test case to Capture Update operations from a Mongo table. + */ + @Test + public void testUpdateCDC() throws InterruptedException { + log.info("------------------------------------------------------------------------------------------------"); + log.info("CDC TestCase: Capturing Update change data from Mongo."); + log.info("------------------------------------------------------------------------------------------------"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String cdcinStreamDefinition = "@app:name('cdcTesting')" + + "@source(type = 'cdc'," + + " url = '" + replicaSetUrl + "'," + + " username = '" + username + "'," + + " password = '" + password + "'," + + " table.name = '" + tableName + "', " + + " operation = 'update', " + + " @map(type='keyvalue'))" + + "define stream updatestm (amount double);"; + + String mongoStoreDefinition = "define stream UpdateStream(name string, amount double);" + + "define stream InsertStream(name string, amount double);" + + "@Store(type='mongodb', mongodb.uri='" + databaseURL + "')" + + "define table SweetProductionTable (name string, amount double);"; + + String insertQuery = "@info(name='query3') " + + "from InsertStream " + + "insert into SweetProductionTable;"; + + String updateQuery = "@info(name='queryUpdate') " + + "from UpdateStream " + + "select name, amount " + + "update SweetProductionTable " + + "set SweetProductionTable.amount = amount " + + "on SweetProductionTable.name == name;"; + + SiddhiAppRuntime cdcAppRuntime = siddhiManager.createSiddhiAppRuntime(cdcinStreamDefinition); + + StreamCallback updatingStreamCallback = new StreamCallback() { + @Override + public void receive(Event[] events) { + for (Event event : events) { + currentEvent = event; + eventCount.getAndIncrement(); + log.info(eventCount + ". " + event); + eventArrived.set(true); + } + } + }; + + cdcAppRuntime.addCallback("updatestm", updatingStreamCallback); + cdcAppRuntime.start(); + + QueryCallback queryCallback2 = new QueryCallback() { + @Override + public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + log.info("update done: " + event); + } + } + }; + SiddhiAppRuntime mongoAppRuntime = siddhiManager.createSiddhiAppRuntime(mongoStoreDefinition + insertQuery + + updateQuery); + mongoAppRuntime.addCallback("queryUpdate", queryCallback2); + mongoAppRuntime.start(); + + //Do an insert first. + InputHandler mongoInputHandler = mongoAppRuntime.getInputHandler("InsertStream"); + Object[] insertingObject = new Object[]{"sweets", 100.00}; + mongoInputHandler.send(insertingObject); + + Thread.sleep(100); + + //Update inserted row. + mongoInputHandler = mongoAppRuntime.getInputHandler("UpdateStream"); + Object[] updatingObject = new Object[]{"sweets", 500.00}; + mongoInputHandler.send(updatingObject); + + //wait to capture the update event. + SiddhiTestHelper.waitForEvents(waitTime, 1, eventCount, timeout); + + //Assert event arrival. + Assert.assertTrue(eventArrived.get()); + + //Assert event data. + Object[] expectedEventObject = new Object[]{500.00}; + Assert.assertEquals(expectedEventObject, currentEvent.getData()); + + cdcAppRuntime.shutdown(); + mongoAppRuntime.shutdown(); + siddhiManager.shutdown(); + } +} diff --git a/pom.xml b/pom.xml index df1e0be..c255f46 100644 --- a/pom.xml +++ b/pom.xml @@ -105,6 +105,12 @@ ${siddhi-store-rdbms.version} test + + io.siddhi.extension.store.mongodb + siddhi-store-mongodb + ${siddhi-store-mongodb.version} + test + org.wso2.carbon.datasources org.wso2.carbon.datasource.core @@ -239,6 +245,7 @@ 3.2.0 6.11 6.0.1 + 2.1.0 RELEASE ../findbugs-exclude.xml 12.1.0.1-atlassian-hosted From 86e7168febe3383b0efc0d37e18e9164474dd36c Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Mon, 3 Feb 2020 22:07:46 +0530 Subject: [PATCH 08/18] Refactor. --- .../source/listening/ChangeDataCapture.java | 6 ++--- .../TestCaseOfCDCListeningModeMongo.java | 26 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java index a51476d..e90dbb1 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java @@ -194,10 +194,10 @@ private Map createMap(ConnectRecord connectRecord, String operat getValue(rawDetails.get(fieldName))); } } catch (DataException ex) { - String deleteDocument = (String) ((Struct)connectRecord.key()).get("id"); + String deleteDocument = (String) ((Struct) connectRecord.key()).get("id"); JSONObject jsonObj = new JSONObject(deleteDocument); Iterator keys = jsonObj.keys(); - while(keys.hasNext()) { + while (keys.hasNext()) { String key = keys.next(); if (jsonObj.get(key) instanceof String) { detailsMap.put("id", jsonObj.get(key)); @@ -234,7 +234,7 @@ private Map createMap(ConnectRecord connectRecord, String operat return detailsMap; } - private Map getMongoDetailMap(JSONObject jsonObj){ + private Map getMongoDetailMap(JSONObject jsonObj) { Map detailsMap = new HashMap<>(); Iterator keys = jsonObj.keys(); for (Iterator it = keys; it.hasNext(); ) { diff --git a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java index 54a0b80..8637e05 100644 --- a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java +++ b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java @@ -28,14 +28,14 @@ public class TestCaseOfCDCListeningModeMongo { private String password; private String databaseURL; private String replicaSetUrl; - private String tableName = "SweetProductionTable"; + private String collectionName = "SweetProductionTable"; @BeforeClass public void initializeConnectionParams() { - databaseURL = "mongodb://127.0.0.1:27017/production"; - replicaSetUrl = "jdbc:mongodb://mongo01/127.0.0.1:27017/production"; - username = System.getenv("DATABASE_USER"); - password = System.getenv("DATABASE_PASSWORD"); + databaseURL = "mongodb://:/"; + replicaSetUrl = "jdbc:mongodb:///:/"; + username = "user_name"; + password = "password"; } @BeforeMethod @@ -61,14 +61,14 @@ public void testInsertCDC() throws InterruptedException { " url = '" + replicaSetUrl + "'," + " username = '" + username + "'," + " password = '" + password + "'," + - " table.name = '" + tableName + "', " + + " table.name = '" + collectionName + "', " + " operation = 'insert', " + " @map(type='keyvalue'))" + - "define stream istm (name string, amount double);"; + "define stream istm (name string, amount double, volume int);"; - String mongoStoreDefinition = "define stream insertionStream (name string, amount double);" + + String mongoStoreDefinition = "define stream insertionStream (name string, amount double, volume int);" + "@Store(type='mongodb', mongodb.uri='" + databaseURL + "')" + - "define table SweetProductionTable (name string, amount double);"; + "define table SweetProductionTable (name string, amount double, volume int);"; String mongoQuery = "@info(name='query2') " + "from insertionStream " + @@ -106,7 +106,7 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { //Do an insert and wait for cdc app to capture. InputHandler rdbmsInputHandler = mongoAppRuntime.getInputHandler("insertionStream"); - Object[] insertingObject = new Object[]{"e001", 100.00}; + Object[] insertingObject = new Object[]{"e001", 100.00, 5}; rdbmsInputHandler.send(insertingObject); SiddhiTestHelper.waitForEvents(waitTime, 1, eventCount, timeout); @@ -137,7 +137,7 @@ public void testDeleteCDC() throws InterruptedException { " url = '" + replicaSetUrl + "'," + " username = '" + username + "'," + " password = '" + password + "'," + - " table.name = '" + tableName + "', " + + " table.name = '" + collectionName + "', " + " operation = 'delete', " + " @map(type='keyvalue'))" + "define stream delstm (id string);"; @@ -229,7 +229,7 @@ public void testUpdateCDC() throws InterruptedException { " url = '" + replicaSetUrl + "'," + " username = '" + username + "'," + " password = '" + password + "'," + - " table.name = '" + tableName + "', " + + " table.name = '" + collectionName + "', " + " operation = 'update', " + " @map(type='keyvalue'))" + "define stream updatestm (amount double);"; @@ -285,7 +285,7 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { Object[] insertingObject = new Object[]{"sweets", 100.00}; mongoInputHandler.send(insertingObject); - Thread.sleep(100); + Thread.sleep(1000); //Update inserted row. mongoInputHandler = mongoAppRuntime.getInputHandler("UpdateStream"); From 6bf70f6e737fe8db5e51efe0a5328874b86164da Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Tue, 4 Feb 2020 18:07:32 +0530 Subject: [PATCH 09/18] Move constants to Util class. --- .../source/listening/ChangeDataCapture.java | 34 +++++++++---------- .../io/cdc/util/CDCSourceConstants.java | 8 +++++ .../TestCaseOfCDCListeningModeMongo.java | 26 ++++++-------- 3 files changed, 35 insertions(+), 33 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java index e90dbb1..21ae472 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java @@ -29,7 +29,6 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; -import org.apache.log4j.Logger; import org.json.JSONException; import org.json.JSONObject; @@ -46,8 +45,6 @@ * This class is for capturing change data using debezium embedded engine. **/ public class ChangeDataCapture { - private static final Logger log = Logger.getLogger(ChangeDataCapture.class); - private String operation; private Configuration config; private SourceEventListener sourceEventListener; @@ -147,8 +144,8 @@ private Map createMap(ConnectRecord connectRecord, String operat String op; try { - op = (String) record.get("op"); - if ("r".equals(op)) { + op = (String) record.get(CDCSourceConstants.CONNECT_RECORD_OPERATION); + if (CDCSourceConstants.CONNECT_RECORD_INITIAL_SYNC.equals(op)) { op = CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION; } } catch (NullPointerException | DataException ex) { @@ -194,15 +191,11 @@ private Map createMap(ConnectRecord connectRecord, String operat getValue(rawDetails.get(fieldName))); } } catch (DataException ex) { - String deleteDocument = (String) ((Struct) connectRecord.key()).get("id"); - JSONObject jsonObj = new JSONObject(deleteDocument); - Iterator keys = jsonObj.keys(); - while (keys.hasNext()) { - String key = keys.next(); - if (jsonObj.get(key) instanceof String) { - detailsMap.put("id", jsonObj.get(key)); - } - } + String deleteDocumentId = (String) ((Struct) connectRecord.key()). + get(CDCSourceConstants.MONGO_COLLECTION_ID); + JSONObject jsonObjId = new JSONObject(deleteDocumentId); + detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, + jsonObjId.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); } break; case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION: @@ -223,10 +216,15 @@ private Map createMap(ConnectRecord connectRecord, String operat detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); } } catch (DataException ex) { - String updateDocument = (String) record.get("patch"); + String updateDocument = (String) record.get(CDCSourceConstants.MONGO_PATCH); JSONObject jsonObj = new JSONObject(updateDocument); - JSONObject setJsonObj = (JSONObject) jsonObj.get("$set"); + JSONObject setJsonObj = (JSONObject) jsonObj.get(CDCSourceConstants.MONGO_SET); detailsMap = getMongoDetailMap(setJsonObj); + String updateDocumentId = (String) ((Struct) connectRecord.key()). + get(CDCSourceConstants.MONGO_COLLECTION_ID); + JSONObject jsonObjId = new JSONObject(updateDocumentId); + detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, + jsonObjId.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); } break; } @@ -250,11 +248,11 @@ private Map getMongoDetailMap(JSONObject jsonObj) { } else if (jsonObj.get(key) instanceof JSONObject) { try { detailsMap.put(key, Long.parseLong((String) jsonObj.getJSONObject(key). - get("$numberLong"))); + get(CDCSourceConstants.MONGO_OBJECT_NUMBER_LONG))); } catch (JSONException e1) { try { detailsMap.put(key, Double.parseDouble((String) jsonObj.getJSONObject(key). - get("$numberDecimal"))); + get(CDCSourceConstants.MONGO_OBJECT_NUMBER_DECIMAL))); } catch (JSONException e2) { detailsMap.put(key, jsonObj.getJSONObject(key).toString()); } diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java index ffe52da..25bbef4 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java @@ -56,9 +56,11 @@ public class CDCSourceConstants { public static final String BEFORE_PREFIX = "before_"; public static final String CACHE_OBJECT = "cacheObj"; public static final int DEFAULT_SERVER_ID = -1; + public static final String CONNECT_RECORD_OPERATION = "op"; public static final String CONNECT_RECORD_INSERT_OPERATION = "c"; public static final String CONNECT_RECORD_UPDATE_OPERATION = "u"; public static final String CONNECT_RECORD_DELETE_OPERATION = "d"; + public static final String CONNECT_RECORD_INITIAL_SYNC = "r"; public static final String BEFORE = "before"; public static final String AFTER = "after"; public static final String CARBON_HOME = "carbon.home"; @@ -82,4 +84,10 @@ public class CDCSourceConstants { public static final String MONGODB_HOSTS = "mongodb.hosts"; public static final String MONGODB_NAME = "mongodb.name"; public static final String MONGODB_COLLECTION_WHITELIST = "collection.whitelist"; + public static final String MONGO_COLLECTION_OBJECT_ID = "$oid"; + public static final String MONGO_COLLECTION_ID = "id"; + public static final String MONGO_PATCH = "patch"; + public static final String MONGO_SET = "$set"; + public static final String MONGO_OBJECT_NUMBER_LONG = "$numberLong"; + public static final String MONGO_OBJECT_NUMBER_DECIMAL = "$numberDecimal"; } diff --git a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java index 8637e05..772655c 100644 --- a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java +++ b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java @@ -26,14 +26,14 @@ public class TestCaseOfCDCListeningModeMongo { private int timeout = 10000; private String username; private String password; - private String databaseURL; - private String replicaSetUrl; + private String databaseUri; + private String replicaSetUri; private String collectionName = "SweetProductionTable"; @BeforeClass public void initializeConnectionParams() { - databaseURL = "mongodb://:/"; - replicaSetUrl = "jdbc:mongodb:///:/"; + databaseUri = "mongodb://:/"; + replicaSetUri = "jdbc:mongodb:///:/"; username = "user_name"; password = "password"; } @@ -58,7 +58,7 @@ public void testInsertCDC() throws InterruptedException { String cdcinStreamDefinition = "@app:name('cdcTesting')" + "@source(type = 'cdc'," + - " url = '" + replicaSetUrl + "'," + + " url = '" + replicaSetUri + "'," + " username = '" + username + "'," + " password = '" + password + "'," + " table.name = '" + collectionName + "', " + @@ -67,7 +67,7 @@ public void testInsertCDC() throws InterruptedException { "define stream istm (name string, amount double, volume int);"; String mongoStoreDefinition = "define stream insertionStream (name string, amount double, volume int);" + - "@Store(type='mongodb', mongodb.uri='" + databaseURL + "')" + + "@Store(type='mongodb', mongodb.uri='" + databaseUri + "')" + "define table SweetProductionTable (name string, amount double, volume int);"; String mongoQuery = "@info(name='query2') " + @@ -134,7 +134,7 @@ public void testDeleteCDC() throws InterruptedException { String cdcinStreamDefinition = "@app:name('cdcTesting')" + "@source(type = 'cdc'," + - " url = '" + replicaSetUrl + "'," + + " url = '" + replicaSetUri + "'," + " username = '" + username + "'," + " password = '" + password + "'," + " table.name = '" + collectionName + "', " + @@ -145,7 +145,7 @@ public void testDeleteCDC() throws InterruptedException { String mongoStoreDefinition = "define stream DeletionStream (name string, amount double);" + "define stream InsertStream(name string, amount double);" + - "@Store(type='mongodb', mongodb.uri='" + databaseURL + "')" + + "@Store(type='mongodb', mongodb.uri='" + databaseUri + "')" + "define table SweetProductionTable (name string, amount double);"; String insertQuery = "@info(name='query3') " + @@ -226,17 +226,17 @@ public void testUpdateCDC() throws InterruptedException { String cdcinStreamDefinition = "@app:name('cdcTesting')" + "@source(type = 'cdc'," + - " url = '" + replicaSetUrl + "'," + + " url = '" + replicaSetUri + "'," + " username = '" + username + "'," + " password = '" + password + "'," + " table.name = '" + collectionName + "', " + " operation = 'update', " + " @map(type='keyvalue'))" + - "define stream updatestm (amount double);"; + "define stream updatestm (id string, amount double);"; String mongoStoreDefinition = "define stream UpdateStream(name string, amount double);" + "define stream InsertStream(name string, amount double);" + - "@Store(type='mongodb', mongodb.uri='" + databaseURL + "')" + + "@Store(type='mongodb', mongodb.uri='" + databaseUri + "')" + "define table SweetProductionTable (name string, amount double);"; String insertQuery = "@info(name='query3') " + @@ -298,10 +298,6 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { //Assert event arrival. Assert.assertTrue(eventArrived.get()); - //Assert event data. - Object[] expectedEventObject = new Object[]{500.00}; - Assert.assertEquals(expectedEventObject, currentEvent.getData()); - cdcAppRuntime.shutdown(); mongoAppRuntime.shutdown(); siddhiManager.shutdown(); From 4e4fcc9e1d5c6e42c3ac2fb0f2fdb6afe2498850 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Tue, 4 Feb 2020 19:02:05 +0530 Subject: [PATCH 10/18] Return 'id' on insert. --- .../io/cdc/source/listening/ChangeDataCapture.java | 8 +++++++- .../siddhi/extension/io/cdc/util/CDCSourceConstants.java | 1 + .../io/cdc/source/TestCaseOfCDCListeningModeMongo.java | 5 +---- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java index 21ae472..b917559 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java @@ -254,7 +254,13 @@ private Map getMongoDetailMap(JSONObject jsonObj) { detailsMap.put(key, Double.parseDouble((String) jsonObj.getJSONObject(key). get(CDCSourceConstants.MONGO_OBJECT_NUMBER_DECIMAL))); } catch (JSONException e2) { - detailsMap.put(key, jsonObj.getJSONObject(key).toString()); + if (key.equals(CDCSourceConstants.MONGO_COLLECTION_INSERT_ID)) { + detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, jsonObj.getJSONObject(key). + get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); + } else { + detailsMap.put(key, jsonObj.getJSONObject(key).toString()); + } + } } } diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java index 25bbef4..27cda83 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java @@ -86,6 +86,7 @@ public class CDCSourceConstants { public static final String MONGODB_COLLECTION_WHITELIST = "collection.whitelist"; public static final String MONGO_COLLECTION_OBJECT_ID = "$oid"; public static final String MONGO_COLLECTION_ID = "id"; + public static final String MONGO_COLLECTION_INSERT_ID = "_id"; public static final String MONGO_PATCH = "patch"; public static final String MONGO_SET = "$set"; public static final String MONGO_OBJECT_NUMBER_LONG = "$numberLong"; diff --git a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java index 772655c..258da4e 100644 --- a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java +++ b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java @@ -64,7 +64,7 @@ public void testInsertCDC() throws InterruptedException { " table.name = '" + collectionName + "', " + " operation = 'insert', " + " @map(type='keyvalue'))" + - "define stream istm (name string, amount double, volume int);"; + "define stream istm (id string, name string, amount double, volume int);"; String mongoStoreDefinition = "define stream insertionStream (name string, amount double, volume int);" + "@Store(type='mongodb', mongodb.uri='" + databaseUri + "')" + @@ -113,9 +113,6 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { //Assert event arrival. Assert.assertTrue(eventArrived.get()); - //Assert event data. - Assert.assertEquals(insertingObject, currentEvent.getData()); - cdcAppRuntime.shutdown(); mongoAppRuntime.shutdown(); siddhiManager.shutdown(); From 99ce572489a310e9720dd0834300d759ec8108d9 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Wed, 5 Feb 2020 14:55:18 +0530 Subject: [PATCH 11/18] Code review suggested changes. --- .../extension/io/cdc/source/CDCSource.java | 8 +- .../source/listening/ChangeDataCapture.java | 166 +----------------- .../listening/MongoChangeDataCapture.java | 114 ++++++++++++ .../listening/RdbmsChangeDataCapture.java | 115 ++++++++++++ .../extension/io/cdc/util/CDCSourceUtil.java | 8 +- .../TestCaseOfCDCListeningModeMongo.java | 10 +- 6 files changed, 248 insertions(+), 173 deletions(-) create mode 100644 component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java create mode 100644 component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java index 5610e83..bddf846 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java @@ -36,6 +36,8 @@ import io.siddhi.core.util.transport.OptionHolder; import io.siddhi.extension.io.cdc.source.listening.CDCSourceObjectKeeper; import io.siddhi.extension.io.cdc.source.listening.ChangeDataCapture; +import io.siddhi.extension.io.cdc.source.listening.MongoChangeDataCapture; +import io.siddhi.extension.io.cdc.source.listening.RdbmsChangeDataCapture; import io.siddhi.extension.io.cdc.source.listening.WrongConfigurationException; import io.siddhi.extension.io.cdc.source.polling.CDCPoller; import io.siddhi.extension.io.cdc.util.CDCSourceConstants; @@ -402,7 +404,11 @@ public StateFactory init(SourceEventListener sourceEventListener, Opti validateListeningModeParameters(optionHolder); //send sourceEventListener and preferred operation to changeDataCapture object - changeDataCapture = new ChangeDataCapture(operation, sourceEventListener); + if (url.contains("jdbc:mongodb")) { + changeDataCapture = new MongoChangeDataCapture(operation, sourceEventListener); + } else { + changeDataCapture = new RdbmsChangeDataCapture(operation, sourceEventListener); + } //create the folder for history file if not exists File directory = new File(historyFileDirectory); diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java index b917559..7efa3ef 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java @@ -19,32 +19,20 @@ package io.siddhi.extension.io.cdc.source.listening; import io.debezium.config.Configuration; -import io.debezium.data.VariableScaleDecimal; import io.debezium.embedded.EmbeddedEngine; import io.debezium.embedded.spi.OffsetCommitPolicy; import io.siddhi.core.exception.SiddhiAppRuntimeException; import io.siddhi.core.stream.input.source.SourceEventListener; -import io.siddhi.extension.io.cdc.util.CDCSourceConstants; import org.apache.kafka.connect.connector.ConnectRecord; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.DataException; -import org.json.JSONException; -import org.json.JSONObject; -import java.math.BigDecimal; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * This class is for capturing change data using debezium embedded engine. **/ -public class ChangeDataCapture { +public abstract class ChangeDataCapture { private String operation; private Configuration config; private SourceEventListener sourceEventListener; @@ -133,156 +121,6 @@ private void handleEvent(ConnectRecord connectRecord) { * @param operation is the change data event which is specified by the user. **/ - private Map createMap(ConnectRecord connectRecord, String operation) { + abstract Map createMap(ConnectRecord connectRecord, String operation); - //Map to return - Map detailsMap = new HashMap<>(); - - Struct record = (Struct) connectRecord.value(); - - //get the change data object's operation. - String op; - - try { - op = (String) record.get(CDCSourceConstants.CONNECT_RECORD_OPERATION); - if (CDCSourceConstants.CONNECT_RECORD_INITIAL_SYNC.equals(op)) { - op = CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION; - } - } catch (NullPointerException | DataException ex) { - return detailsMap; - } - - //match the change data's operation with user specifying operation and proceed. - if (operation.equalsIgnoreCase(CDCSourceConstants.INSERT) && - op.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION) - || operation.equalsIgnoreCase(CDCSourceConstants.DELETE) && - op.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION) - || operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) && - op.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)) { - - Struct rawDetails; - List fields; - String fieldName; - - switch (op) { - case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION: - //append row details after insert. - try { - rawDetails = (Struct) record.get(CDCSourceConstants.AFTER); - fields = rawDetails.schema().fields(); - for (Field key : fields) { - fieldName = key.name(); - detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); - } - } catch (ClassCastException ex) { - String insertString = (String) record.get(CDCSourceConstants.AFTER); - JSONObject jsonObj = new JSONObject(insertString); - detailsMap = getMongoDetailMap(jsonObj); - } - break; - case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION: - //append row details before delete. - try { - rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE); - fields = rawDetails.schema().fields(); - for (Field key : fields) { - fieldName = key.name(); - detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName, - getValue(rawDetails.get(fieldName))); - } - } catch (DataException ex) { - String deleteDocumentId = (String) ((Struct) connectRecord.key()). - get(CDCSourceConstants.MONGO_COLLECTION_ID); - JSONObject jsonObjId = new JSONObject(deleteDocumentId); - detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, - jsonObjId.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); - } - break; - case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION: - //append row details before update. - try { - rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE); - fields = rawDetails.schema().fields(); - for (Field key : fields) { - fieldName = key.name(); - detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName, - getValue(rawDetails.get(fieldName))); - } - //append row details after update. - rawDetails = (Struct) record.get(CDCSourceConstants.AFTER); - fields = rawDetails.schema().fields(); - for (Field key : fields) { - fieldName = key.name(); - detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); - } - } catch (DataException ex) { - String updateDocument = (String) record.get(CDCSourceConstants.MONGO_PATCH); - JSONObject jsonObj = new JSONObject(updateDocument); - JSONObject setJsonObj = (JSONObject) jsonObj.get(CDCSourceConstants.MONGO_SET); - detailsMap = getMongoDetailMap(setJsonObj); - String updateDocumentId = (String) ((Struct) connectRecord.key()). - get(CDCSourceConstants.MONGO_COLLECTION_ID); - JSONObject jsonObjId = new JSONObject(updateDocumentId); - detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, - jsonObjId.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); - } - break; - } - } - return detailsMap; - } - - private Map getMongoDetailMap(JSONObject jsonObj) { - Map detailsMap = new HashMap<>(); - Iterator keys = jsonObj.keys(); - for (Iterator it = keys; it.hasNext(); ) { - String key = it.next(); - if (jsonObj.get(key) instanceof Boolean) { - detailsMap.put(key, jsonObj.getBoolean(key)); - } else if (jsonObj.get(key) instanceof Integer) { - detailsMap.put(key, jsonObj.getInt(key)); - } else if (jsonObj.get(key) instanceof Double) { - detailsMap.put(key, jsonObj.getDouble(key)); - } else if (jsonObj.get(key) instanceof String) { - detailsMap.put(key, jsonObj.getString(key)); - } else if (jsonObj.get(key) instanceof JSONObject) { - try { - detailsMap.put(key, Long.parseLong((String) jsonObj.getJSONObject(key). - get(CDCSourceConstants.MONGO_OBJECT_NUMBER_LONG))); - } catch (JSONException e1) { - try { - detailsMap.put(key, Double.parseDouble((String) jsonObj.getJSONObject(key). - get(CDCSourceConstants.MONGO_OBJECT_NUMBER_DECIMAL))); - } catch (JSONException e2) { - if (key.equals(CDCSourceConstants.MONGO_COLLECTION_INSERT_ID)) { - detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, jsonObj.getJSONObject(key). - get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); - } else { - detailsMap.put(key, jsonObj.getJSONObject(key).toString()); - } - - } - } - } - } - return detailsMap; - } - - private Object getValue(Object v) { - if (v instanceof Struct) { - Optional value = VariableScaleDecimal.toLogical((Struct) v).getDecimalValue(); - BigDecimal bigDecimal = value.orElse(null); - if (bigDecimal == null) { - return null; - } - return bigDecimal.longValue(); - } - if (v instanceof Short) { - return ((Short) v).intValue(); - } - if (v instanceof Byte) { - return ((Byte) v).intValue(); - } - return v; - } } diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java new file mode 100644 index 0000000..fa8cc93 --- /dev/null +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java @@ -0,0 +1,114 @@ +package io.siddhi.extension.io.cdc.source.listening; + +import io.siddhi.core.stream.input.source.SourceEventListener; +import io.siddhi.extension.io.cdc.util.CDCSourceConstants; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.json.JSONObject; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * This class is for capturing change data for MongoDB using debezium embedded engine. + **/ +public class MongoChangeDataCapture extends ChangeDataCapture { + + public MongoChangeDataCapture(String operation, SourceEventListener sourceEventListener) { + super(operation, sourceEventListener); + } + + Map createMap(ConnectRecord connectRecord, String operation) { + + //Map to return + Map detailsMap = new HashMap<>(); + + Struct record = (Struct) connectRecord.value(); + + //get the change data object's operation. + String op; + + try { + op = (String) record.get(CDCSourceConstants.CONNECT_RECORD_OPERATION); + } catch (NullPointerException | DataException ex) { + return detailsMap; + } + + //match the change data's operation with user specifying operation and proceed. + if (operation.equalsIgnoreCase(CDCSourceConstants.INSERT) && + op.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION) + || operation.equalsIgnoreCase(CDCSourceConstants.DELETE) && + op.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION) + || operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) && + op.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)) { + + switch (op) { + case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION: + //append row details after insert. + String insertString = (String) record.get(CDCSourceConstants.AFTER); + JSONObject jsonObj = new JSONObject(insertString); + detailsMap = getMongoDetailMap(jsonObj); + break; + case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION: + //append row details before delete. + String deleteDocumentId = (String) ((Struct) connectRecord.key()) + .get(CDCSourceConstants.MONGO_COLLECTION_ID); + JSONObject jsonObjId = new JSONObject(deleteDocumentId); + detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, + jsonObjId.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); + + break; + case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION: + //append row details before update. + String updateDocument = (String) record.get(CDCSourceConstants.MONGO_PATCH); + JSONObject jsonObj1 = new JSONObject(updateDocument); + JSONObject setJsonObj = (JSONObject) jsonObj1.get(CDCSourceConstants.MONGO_SET); + detailsMap = getMongoDetailMap(setJsonObj); + String updateDocumentId = (String) ((Struct) connectRecord.key()) + .get(CDCSourceConstants.MONGO_COLLECTION_ID); + JSONObject jsonObjId1 = new JSONObject(updateDocumentId); + detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, + jsonObjId1.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); + break; + default: + break; + } + } + return detailsMap; + } + + private Map getMongoDetailMap(JSONObject jsonObj) { + Map detailsMap = new HashMap<>(); + Iterator keys = jsonObj.keys(); + for (Iterator it = keys; it.hasNext(); ) { + String key = it.next(); + if (jsonObj.get(key) instanceof Boolean) { + detailsMap.put(key, jsonObj.getBoolean(key)); + } else if (jsonObj.get(key) instanceof Integer) { + detailsMap.put(key, jsonObj.getInt(key)); + } else if (jsonObj.get(key) instanceof Double) { + detailsMap.put(key, jsonObj.getDouble(key)); + } else if (jsonObj.get(key) instanceof String) { + detailsMap.put(key, jsonObj.getString(key)); + } else if (jsonObj.get(key) instanceof JSONObject) { + if (jsonObj.getJSONObject(key).toString().contains(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)) { + detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, jsonObj.getJSONObject(key) + .get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); + } else if (jsonObj.getJSONObject(key).toString() + .contains(CDCSourceConstants.MONGO_OBJECT_NUMBER_DECIMAL)) { + detailsMap.put(key, Double.parseDouble((String) jsonObj.getJSONObject(key) + .get(CDCSourceConstants.MONGO_OBJECT_NUMBER_DECIMAL))); + } else if (jsonObj.getJSONObject(key).toString() + .contains(CDCSourceConstants.MONGO_OBJECT_NUMBER_LONG)) { + detailsMap.put(key, Long.parseLong((String) jsonObj.getJSONObject(key) + .get(CDCSourceConstants.MONGO_OBJECT_NUMBER_LONG))); + } else { + detailsMap.put(key, jsonObj.getJSONObject(key).toString()); + } + } + } + return detailsMap; + } +} diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java new file mode 100644 index 0000000..61096ab --- /dev/null +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java @@ -0,0 +1,115 @@ +package io.siddhi.extension.io.cdc.source.listening; + +import io.debezium.data.VariableScaleDecimal; +import io.siddhi.core.stream.input.source.SourceEventListener; +import io.siddhi.extension.io.cdc.util.CDCSourceConstants; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * This class is for capturing change data for RDBMS using debezium embedded engine. + **/ +public class RdbmsChangeDataCapture extends ChangeDataCapture { + + public RdbmsChangeDataCapture(String operation, SourceEventListener sourceEventListener) { + super(operation, sourceEventListener); + } + + Map createMap(ConnectRecord connectRecord, String operation) { + + //Map to return + Map detailsMap = new HashMap<>(); + + Struct record = (Struct) connectRecord.value(); + + //get the change data object's operation. + String op; + + try { + op = (String) record.get(CDCSourceConstants.CONNECT_RECORD_OPERATION); + } catch (NullPointerException | DataException ex) { + return detailsMap; + } + + //match the change data's operation with user specifying operation and proceed. + if (operation.equalsIgnoreCase(CDCSourceConstants.INSERT) && + op.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION) + || operation.equalsIgnoreCase(CDCSourceConstants.DELETE) && + op.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION) + || operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) && + op.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)) { + + Struct rawDetails; + List fields; + String fieldName; + + switch (op) { + case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION: + //append row details after insert. + rawDetails = (Struct) record.get(CDCSourceConstants.AFTER); + fields = rawDetails.schema().fields(); + for (Field key : fields) { + fieldName = key.name(); + detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); + } + break; + case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION: + //append row details before delete. + rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE); + fields = rawDetails.schema().fields(); + for (Field key : fields) { + fieldName = key.name(); + detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName, + getValue(rawDetails.get(fieldName))); + } + break; + case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION: + //append row details before update. + rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE); + fields = rawDetails.schema().fields(); + for (Field key : fields) { + fieldName = key.name(); + detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName, + getValue(rawDetails.get(fieldName))); + } + //append row details after update. + rawDetails = (Struct) record.get(CDCSourceConstants.AFTER); + fields = rawDetails.schema().fields(); + for (Field key : fields) { + fieldName = key.name(); + detailsMap.put(fieldName, getValue(rawDetails.get(fieldName))); + } + break; + default: + break; + } + } + return detailsMap; + } + + private Object getValue(Object v) { + if (v instanceof Struct) { + Optional value = VariableScaleDecimal.toLogical((Struct) v).getDecimalValue(); + BigDecimal bigDecimal = value.orElse(null); + if (bigDecimal == null) { + return null; + } + return bigDecimal.longValue(); + } + if (v instanceof Short) { + return ((Short) v).intValue(); + } + if (v instanceof Byte) { + return ((Byte) v).intValue(); + } + return v; + } +} diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java index e3ee8a2..025ed92 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java @@ -173,9 +173,9 @@ public static Map getConfigMap(String username, String password, port = Integer.parseInt(matcher.group(3)); database = matcher.group(4); } else { - throw new WrongConfigurationException("Invalid JDBC url: " + url + + throw new WrongConfigurationException("Invalid MongoDB uri: " + url + " received for stream: " + siddhiStreamName + - ". Expected url format: jdbc:mongodb:///:/" + + ". Expected uri format: jdbc:mongodb:///:/" + ""); } configMap.put(CDCSourceConstants.CONNECTOR_CLASS, CDCSourceConstants.MONGODB_CONNECTOR_CLASS); @@ -198,8 +198,8 @@ public static Map getConfigMap(String username, String password, configMap.put(CDCSourceConstants.DATABASE_USER, username); configMap.put(CDCSourceConstants.DATABASE_PASSWORD, password); } else { - configMap.put(CDCSourceConstants.MONGODB_USER, username); - configMap.put(CDCSourceConstants.MONGODB_PASSWORD, password); +// configMap.put(CDCSourceConstants.MONGODB_USER, username); +// configMap.put(CDCSourceConstants.MONGODB_PASSWORD, password); } if (serverID == CDCSourceConstants.DEFAULT_SERVER_ID) { diff --git a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java index 258da4e..ac2f235 100644 --- a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java +++ b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java @@ -91,7 +91,9 @@ public void receive(Event[] events) { cdcAppRuntime.addCallback("istm", insertionStreamCallback); cdcAppRuntime.start(); - QueryCallback rdbmsQueryCallback = new QueryCallback() { + Thread.sleep(1000); + + QueryCallback mongoQueryCallback = new QueryCallback() { @Override public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { for (Event event : inEvents) { @@ -101,13 +103,13 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { }; SiddhiAppRuntime mongoAppRuntime = siddhiManager.createSiddhiAppRuntime(mongoStoreDefinition + mongoQuery); - mongoAppRuntime.addCallback("query2", rdbmsQueryCallback); + mongoAppRuntime.addCallback("query2", mongoQueryCallback); mongoAppRuntime.start(); //Do an insert and wait for cdc app to capture. - InputHandler rdbmsInputHandler = mongoAppRuntime.getInputHandler("insertionStream"); + InputHandler mongoInputHandler = mongoAppRuntime.getInputHandler("insertionStream"); Object[] insertingObject = new Object[]{"e001", 100.00, 5}; - rdbmsInputHandler.send(insertingObject); + mongoInputHandler.send(insertingObject); SiddhiTestHelper.waitForEvents(waitTime, 1, eventCount, timeout); //Assert event arrival. From 1b64d4eaacc8b16973bd3b2ee81419ed8d8aa397 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Wed, 5 Feb 2020 16:49:56 +0530 Subject: [PATCH 12/18] Remove commented lines. --- .../java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java index 025ed92..931e1f1 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java @@ -198,8 +198,8 @@ public static Map getConfigMap(String username, String password, configMap.put(CDCSourceConstants.DATABASE_USER, username); configMap.put(CDCSourceConstants.DATABASE_PASSWORD, password); } else { -// configMap.put(CDCSourceConstants.MONGODB_USER, username); -// configMap.put(CDCSourceConstants.MONGODB_PASSWORD, password); + configMap.put(CDCSourceConstants.MONGODB_USER, username); + configMap.put(CDCSourceConstants.MONGODB_PASSWORD, password); } if (serverID == CDCSourceConstants.DEFAULT_SERVER_ID) { From f037b3fed1eaa6872450b6665fd44b992d028a72 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Wed, 5 Feb 2020 20:05:12 +0530 Subject: [PATCH 13/18] Add license headers & assertions for tests --- .../listening/MongoChangeDataCapture.java | 18 +++++++++++++ .../listening/RdbmsChangeDataCapture.java | 18 +++++++++++++ .../TestCaseOfCDCListeningModeMongo.java | 25 ++++++++++++++++++- 3 files changed, 60 insertions(+), 1 deletion(-) diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java index fa8cc93..13033eb 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java @@ -1,3 +1,21 @@ +/* + * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package io.siddhi.extension.io.cdc.source.listening; import io.siddhi.core.stream.input.source.SourceEventListener; diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java index 61096ab..73159db 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java @@ -1,3 +1,21 @@ +/* + * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package io.siddhi.extension.io.cdc.source.listening; import io.debezium.data.VariableScaleDecimal; diff --git a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java index ac2f235..1c3eb28 100644 --- a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java +++ b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java @@ -1,3 +1,21 @@ +/* + * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package io.siddhi.extension.io.cdc.source; import io.siddhi.core.SiddhiAppRuntime; @@ -64,7 +82,7 @@ public void testInsertCDC() throws InterruptedException { " table.name = '" + collectionName + "', " + " operation = 'insert', " + " @map(type='keyvalue'))" + - "define stream istm (id string, name string, amount double, volume int);"; + "define stream istm (name string, amount double, volume int);"; String mongoStoreDefinition = "define stream insertionStream (name string, amount double, volume int);" + "@Store(type='mongodb', mongodb.uri='" + databaseUri + "')" + @@ -115,6 +133,11 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { //Assert event arrival. Assert.assertTrue(eventArrived.get()); + Thread.sleep(1000); + + //Assert event data. + Assert.assertEquals(insertingObject, currentEvent.getData()); + cdcAppRuntime.shutdown(); mongoAppRuntime.shutdown(); siddhiManager.shutdown(); From 65de78124125ad16a75dd6c67c29c25ea87d4b44 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Thu, 6 Feb 2020 10:05:07 +0530 Subject: [PATCH 14/18] Refactor --- .../listening/MongoChangeDataCapture.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java index 13033eb..f97314d 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; +import org.json.JSONException; import org.json.JSONObject; import java.util.HashMap; @@ -106,24 +107,28 @@ private Map getMongoDetailMap(JSONObject jsonObj) { detailsMap.put(key, jsonObj.getBoolean(key)); } else if (jsonObj.get(key) instanceof Integer) { detailsMap.put(key, jsonObj.getInt(key)); + } else if (jsonObj.get(key) instanceof Long) { + detailsMap.put(key, jsonObj.getDouble(key)); } else if (jsonObj.get(key) instanceof Double) { detailsMap.put(key, jsonObj.getDouble(key)); } else if (jsonObj.get(key) instanceof String) { detailsMap.put(key, jsonObj.getString(key)); } else if (jsonObj.get(key) instanceof JSONObject) { - if (jsonObj.getJSONObject(key).toString().contains(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)) { - detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, jsonObj.getJSONObject(key) - .get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); - } else if (jsonObj.getJSONObject(key).toString() - .contains(CDCSourceConstants.MONGO_OBJECT_NUMBER_DECIMAL)) { - detailsMap.put(key, Double.parseDouble((String) jsonObj.getJSONObject(key) - .get(CDCSourceConstants.MONGO_OBJECT_NUMBER_DECIMAL))); - } else if (jsonObj.getJSONObject(key).toString() - .contains(CDCSourceConstants.MONGO_OBJECT_NUMBER_LONG)) { + try { detailsMap.put(key, Long.parseLong((String) jsonObj.getJSONObject(key) .get(CDCSourceConstants.MONGO_OBJECT_NUMBER_LONG))); - } else { - detailsMap.put(key, jsonObj.getJSONObject(key).toString()); + } catch (JSONException notLongObjectEx) { + try { + detailsMap.put(key, Double.parseDouble((String) jsonObj.getJSONObject(key) + .get(CDCSourceConstants.MONGO_OBJECT_NUMBER_DECIMAL))); + } catch (JSONException notDoubleObjectEx) { + if (key.equals(CDCSourceConstants.MONGO_COLLECTION_INSERT_ID)) { + detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, jsonObj.getJSONObject(key) + .get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); + } else { + detailsMap.put(key, jsonObj.getJSONObject(key).toString()); + } + } } } } From 94de914ea1bb23ccfae3ba9e842b89096fe13514 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Thu, 6 Feb 2020 11:46:41 +0530 Subject: [PATCH 15/18] Fix issues --- component/pom.xml | 2 +- .../io/siddhi/extension/io/cdc/source/CDCSource.java | 2 +- .../cdc/source/listening/MongoChangeDataCapture.java | 9 +++------ .../cdc/source/listening/RdbmsChangeDataCapture.java | 10 +++------- .../io/cdc/source/TestCaseOfCDCListeningModeMongo.java | 6 +++++- pom.xml | 1 + 6 files changed, 14 insertions(+), 16 deletions(-) diff --git a/component/pom.xml b/component/pom.xml index 197995c..9abb9b5 100644 --- a/component/pom.xml +++ b/component/pom.xml @@ -30,7 +30,7 @@ org.json json - 20190722 + ${json.version} org.apache.log4j.wso2 diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java index bddf846..b2ccca0 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java @@ -404,7 +404,7 @@ public StateFactory init(SourceEventListener sourceEventListener, Opti validateListeningModeParameters(optionHolder); //send sourceEventListener and preferred operation to changeDataCapture object - if (url.contains("jdbc:mongodb")) { + if (url.toLowerCase().contains("jdbc:mongodb")) { changeDataCapture = new MongoChangeDataCapture(operation, sourceEventListener); } else { changeDataCapture = new RdbmsChangeDataCapture(operation, sourceEventListener); diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java index f97314d..f0bacbc 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; +import org.apache.log4j.Logger; import org.json.JSONException; import org.json.JSONObject; @@ -34,27 +35,23 @@ * This class is for capturing change data for MongoDB using debezium embedded engine. **/ public class MongoChangeDataCapture extends ChangeDataCapture { + private static final Logger log = Logger.getLogger(MongoChangeDataCapture.class); public MongoChangeDataCapture(String operation, SourceEventListener sourceEventListener) { super(operation, sourceEventListener); } Map createMap(ConnectRecord connectRecord, String operation) { - //Map to return Map detailsMap = new HashMap<>(); - Struct record = (Struct) connectRecord.value(); - //get the change data object's operation. String op; - try { op = (String) record.get(CDCSourceConstants.CONNECT_RECORD_OPERATION); } catch (NullPointerException | DataException ex) { return detailsMap; } - //match the change data's operation with user specifying operation and proceed. if (operation.equalsIgnoreCase(CDCSourceConstants.INSERT) && op.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION) @@ -62,7 +59,6 @@ Map createMap(ConnectRecord connectRecord, String operation) { op.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION) || operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) && op.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)) { - switch (op) { case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION: //append row details after insert. @@ -92,6 +88,7 @@ Map createMap(ConnectRecord connectRecord, String operation) { jsonObjId1.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); break; default: + log.info("Provided \"op\" value is not supported."); break; } } diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java index 73159db..ca948ac 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java @@ -25,6 +25,7 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; +import org.apache.log4j.Logger; import java.math.BigDecimal; import java.util.HashMap; @@ -36,27 +37,23 @@ * This class is for capturing change data for RDBMS using debezium embedded engine. **/ public class RdbmsChangeDataCapture extends ChangeDataCapture { + private static final Logger log = Logger.getLogger(RdbmsChangeDataCapture.class); public RdbmsChangeDataCapture(String operation, SourceEventListener sourceEventListener) { super(operation, sourceEventListener); } Map createMap(ConnectRecord connectRecord, String operation) { - //Map to return Map detailsMap = new HashMap<>(); - Struct record = (Struct) connectRecord.value(); - //get the change data object's operation. String op; - try { op = (String) record.get(CDCSourceConstants.CONNECT_RECORD_OPERATION); } catch (NullPointerException | DataException ex) { return detailsMap; } - //match the change data's operation with user specifying operation and proceed. if (operation.equalsIgnoreCase(CDCSourceConstants.INSERT) && op.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION) @@ -64,11 +61,9 @@ Map createMap(ConnectRecord connectRecord, String operation) { op.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION) || operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) && op.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)) { - Struct rawDetails; List fields; String fieldName; - switch (op) { case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION: //append row details after insert. @@ -107,6 +102,7 @@ Map createMap(ConnectRecord connectRecord, String operation) { } break; default: + log.info("Provided \"op\" value is not supported."); break; } } diff --git a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java index 1c3eb28..408911f 100644 --- a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java +++ b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java @@ -254,7 +254,7 @@ public void testUpdateCDC() throws InterruptedException { " table.name = '" + collectionName + "', " + " operation = 'update', " + " @map(type='keyvalue'))" + - "define stream updatestm (id string, amount double);"; + "define stream updatestm (amount double);"; String mongoStoreDefinition = "define stream UpdateStream(name string, amount double);" + "define stream InsertStream(name string, amount double);" + @@ -320,6 +320,10 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { //Assert event arrival. Assert.assertTrue(eventArrived.get()); + //Assert event data. + Object[] expectedEventObject = new Object[]{500.00}; + Assert.assertEquals(expectedEventObject, currentEvent.getData()); + cdcAppRuntime.shutdown(); mongoAppRuntime.shutdown(); siddhiManager.shutdown(); diff --git a/pom.xml b/pom.xml index c255f46..55eba47 100644 --- a/pom.xml +++ b/pom.xml @@ -263,6 +263,7 @@ 12.2.0.1 2.3.0 3.4.0 + 20190722 https://github.com/siddhi-io/siddhi-io-cdc.git From d64a3b265292ca7d4db67b1c117b2f88652d6240 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Thu, 6 Feb 2020 14:06:23 +0530 Subject: [PATCH 16/18] Fix issues --- component/pom.xml | 1 - .../main/java/io/siddhi/extension/io/cdc/source/CDCSource.java | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/component/pom.xml b/component/pom.xml index 9abb9b5..e1e166f 100644 --- a/component/pom.xml +++ b/component/pom.xml @@ -584,7 +584,6 @@ io.siddhi.query.*;version="${siddhi.import.version.range}", *;resolution:=optional - * META-INF=target/classes/META-INF, META-INF/services=${project.build.directory}/dependencies/connect-runtime/META-INF/services, diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java index b2ccca0..3bb6e67 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java @@ -48,6 +48,7 @@ import java.io.File; import java.sql.SQLException; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -404,7 +405,7 @@ public StateFactory init(SourceEventListener sourceEventListener, Opti validateListeningModeParameters(optionHolder); //send sourceEventListener and preferred operation to changeDataCapture object - if (url.toLowerCase().contains("jdbc:mongodb")) { + if (url.toLowerCase(Locale.ENGLISH).contains("jdbc:mongodb")) { changeDataCapture = new MongoChangeDataCapture(operation, sourceEventListener); } else { changeDataCapture = new RdbmsChangeDataCapture(operation, sourceEventListener); From c2b2ad1853d0837bdf883f0997ea8bda0a3cfee6 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Thu, 6 Feb 2020 19:16:39 +0530 Subject: [PATCH 17/18] Add dependency --- component/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/component/pom.xml b/component/pom.xml index e1e166f..85b034d 100644 --- a/component/pom.xml +++ b/component/pom.xml @@ -577,6 +577,7 @@ org.apache.kafka.common.*, org.apache.kafka.clients.*, org.antlr.*, + org.json.*, io.siddhi.annotation.*;version="${siddhi.import.version.range}", From 91bd60541084d92e741a71ae2145289f13b822f9 Mon Sep 17 00:00:00 2001 From: Prabod Dunuwila Date: Fri, 7 Feb 2020 14:35:48 +0530 Subject: [PATCH 18/18] Refactor log message. --- .../io/cdc/source/listening/MongoChangeDataCapture.java | 2 +- .../io/cdc/source/listening/RdbmsChangeDataCapture.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java index f0bacbc..319da38 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java @@ -88,7 +88,7 @@ Map createMap(ConnectRecord connectRecord, String operation) { jsonObjId1.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID)); break; default: - log.info("Provided \"op\" value is not supported."); + log.info("Provided value for \"op\" : " + op + " is not supported."); break; } } diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java index ca948ac..ad82ddb 100644 --- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java +++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java @@ -102,7 +102,7 @@ Map createMap(ConnectRecord connectRecord, String operation) { } break; default: - log.info("Provided \"op\" value is not supported."); + log.info("Provided value for \"op\" : " + op + " is not supported."); break; } }