diff --git a/component/pom.xml b/component/pom.xml
index 99c25ed..85b034d 100644
--- a/component/pom.xml
+++ b/component/pom.xml
@@ -27,6 +27,11 @@
bundle
Siddhi IO CDC extension
+
+ org.json
+ json
+ ${json.version}
+
org.apache.log4j.wso2
log4j
@@ -51,6 +56,10 @@
io.debezium
debezium-connector-postgres
+
+ io.debezium
+ debezium-connector-mongodb
+
io.siddhi
siddhi-core
@@ -86,6 +95,11 @@
siddhi-store-rdbms
test
+
+ io.siddhi.extension.store.mongodb
+ siddhi-store-mongodb
+ test
+
com.zaxxer
HikariCP
@@ -562,6 +576,8 @@
org.apache.kafka.connect.*,
org.apache.kafka.common.*,
org.apache.kafka.clients.*,
+ org.antlr.*,
+ org.json.*,
io.siddhi.annotation.*;version="${siddhi.import.version.range}",
@@ -569,7 +585,6 @@
io.siddhi.query.*;version="${siddhi.import.version.range}",
*;resolution:=optional
- *
META-INF=target/classes/META-INF,
META-INF/services=${project.build.directory}/dependencies/connect-runtime/META-INF/services,
diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java
index 5610e83..3bb6e67 100644
--- a/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java
+++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/CDCSource.java
@@ -36,6 +36,8 @@
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.cdc.source.listening.CDCSourceObjectKeeper;
import io.siddhi.extension.io.cdc.source.listening.ChangeDataCapture;
+import io.siddhi.extension.io.cdc.source.listening.MongoChangeDataCapture;
+import io.siddhi.extension.io.cdc.source.listening.RdbmsChangeDataCapture;
import io.siddhi.extension.io.cdc.source.listening.WrongConfigurationException;
import io.siddhi.extension.io.cdc.source.polling.CDCPoller;
import io.siddhi.extension.io.cdc.util.CDCSourceConstants;
@@ -46,6 +48,7 @@
import java.io.File;
import java.sql.SQLException;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -402,7 +405,11 @@ public StateFactory init(SourceEventListener sourceEventListener, Opti
validateListeningModeParameters(optionHolder);
//send sourceEventListener and preferred operation to changeDataCapture object
- changeDataCapture = new ChangeDataCapture(operation, sourceEventListener);
+ if (url.toLowerCase(Locale.ENGLISH).contains("jdbc:mongodb")) {
+ changeDataCapture = new MongoChangeDataCapture(operation, sourceEventListener);
+ } else {
+ changeDataCapture = new RdbmsChangeDataCapture(operation, sourceEventListener);
+ }
//create the folder for history file if not exists
File directory = new File(historyFileDirectory);
diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java
index a7e93e3..7efa3ef 100644
--- a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java
+++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.java
@@ -19,30 +19,20 @@
package io.siddhi.extension.io.cdc.source.listening;
import io.debezium.config.Configuration;
-import io.debezium.data.VariableScaleDecimal;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.spi.OffsetCommitPolicy;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.SourceEventListener;
-import io.siddhi.extension.io.cdc.util.CDCSourceConstants;
import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.data.Field;
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.errors.DataException;
-import java.math.BigDecimal;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* This class is for capturing change data using debezium embedded engine.
**/
-public class ChangeDataCapture {
-
+public abstract class ChangeDataCapture {
private String operation;
private Configuration config;
private SourceEventListener sourceEventListener;
@@ -131,91 +121,6 @@ private void handleEvent(ConnectRecord connectRecord) {
* @param operation is the change data event which is specified by the user.
**/
- private Map createMap(ConnectRecord connectRecord, String operation) {
-
- //Map to return
- Map detailsMap = new HashMap<>();
-
- Struct record = (Struct) connectRecord.value();
-
- //get the change data object's operation.
- String op;
-
- try {
- op = (String) record.get("op");
- } catch (NullPointerException | DataException ex) {
- return detailsMap;
- }
-
- //match the change data's operation with user specifying operation and proceed.
- if (operation.equalsIgnoreCase(CDCSourceConstants.INSERT) &&
- op.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION)
- || operation.equalsIgnoreCase(CDCSourceConstants.DELETE) &&
- op.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION)
- || operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) &&
- op.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)) {
-
- Struct rawDetails;
- List fields;
- String fieldName;
+ abstract Map createMap(ConnectRecord connectRecord, String operation);
- switch (op) {
- case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION:
- //append row details after insert.
- rawDetails = (Struct) record.get(CDCSourceConstants.AFTER);
- fields = rawDetails.schema().fields();
- for (Field key : fields) {
- fieldName = key.name();
- detailsMap.put(fieldName, getValue(rawDetails.get(fieldName)));
- }
- break;
- case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION:
- //append row details before delete.
- rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE);
- fields = rawDetails.schema().fields();
- for (Field key : fields) {
- fieldName = key.name();
- detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName,
- getValue(rawDetails.get(fieldName)));
- }
- break;
- case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION:
- //append row details before update.
- rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE);
- fields = rawDetails.schema().fields();
- for (Field key : fields) {
- fieldName = key.name();
- detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName,
- getValue(rawDetails.get(fieldName)));
- }
- //append row details after update.
- rawDetails = (Struct) record.get(CDCSourceConstants.AFTER);
- fields = rawDetails.schema().fields();
- for (Field key : fields) {
- fieldName = key.name();
- detailsMap.put(fieldName, getValue(rawDetails.get(fieldName)));
- }
- break;
- }
- }
- return detailsMap;
- }
-
- private Object getValue(Object v) {
- if (v instanceof Struct) {
- Optional value = VariableScaleDecimal.toLogical((Struct) v).getDecimalValue();
- BigDecimal bigDecimal = value.orElse(null);
- if (bigDecimal == null) {
- return null;
- }
- return bigDecimal.longValue();
- }
- if (v instanceof Short) {
- return ((Short) v).intValue();
- }
- if (v instanceof Byte) {
- return ((Byte) v).intValue();
- }
- return v;
- }
}
diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java
new file mode 100644
index 0000000..319da38
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/MongoChangeDataCapture.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.siddhi.extension.io.cdc.source.listening;
+
+import io.siddhi.core.stream.input.source.SourceEventListener;
+import io.siddhi.extension.io.cdc.util.CDCSourceConstants;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * This class is for capturing change data for MongoDB using debezium embedded engine.
+ **/
+public class MongoChangeDataCapture extends ChangeDataCapture {
+ private static final Logger log = Logger.getLogger(MongoChangeDataCapture.class);
+
+ public MongoChangeDataCapture(String operation, SourceEventListener sourceEventListener) {
+ super(operation, sourceEventListener);
+ }
+
+ Map createMap(ConnectRecord connectRecord, String operation) {
+ //Map to return
+ Map detailsMap = new HashMap<>();
+ Struct record = (Struct) connectRecord.value();
+ //get the change data object's operation.
+ String op;
+ try {
+ op = (String) record.get(CDCSourceConstants.CONNECT_RECORD_OPERATION);
+ } catch (NullPointerException | DataException ex) {
+ return detailsMap;
+ }
+ //match the change data's operation with user specifying operation and proceed.
+ if (operation.equalsIgnoreCase(CDCSourceConstants.INSERT) &&
+ op.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION)
+ || operation.equalsIgnoreCase(CDCSourceConstants.DELETE) &&
+ op.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION)
+ || operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) &&
+ op.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)) {
+ switch (op) {
+ case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION:
+ //append row details after insert.
+ String insertString = (String) record.get(CDCSourceConstants.AFTER);
+ JSONObject jsonObj = new JSONObject(insertString);
+ detailsMap = getMongoDetailMap(jsonObj);
+ break;
+ case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION:
+ //append row details before delete.
+ String deleteDocumentId = (String) ((Struct) connectRecord.key())
+ .get(CDCSourceConstants.MONGO_COLLECTION_ID);
+ JSONObject jsonObjId = new JSONObject(deleteDocumentId);
+ detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID,
+ jsonObjId.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID));
+
+ break;
+ case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION:
+ //append row details before update.
+ String updateDocument = (String) record.get(CDCSourceConstants.MONGO_PATCH);
+ JSONObject jsonObj1 = new JSONObject(updateDocument);
+ JSONObject setJsonObj = (JSONObject) jsonObj1.get(CDCSourceConstants.MONGO_SET);
+ detailsMap = getMongoDetailMap(setJsonObj);
+ String updateDocumentId = (String) ((Struct) connectRecord.key())
+ .get(CDCSourceConstants.MONGO_COLLECTION_ID);
+ JSONObject jsonObjId1 = new JSONObject(updateDocumentId);
+ detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID,
+ jsonObjId1.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID));
+ break;
+ default:
+ log.info("Provided value for \"op\" : " + op + " is not supported.");
+ break;
+ }
+ }
+ return detailsMap;
+ }
+
+ private Map getMongoDetailMap(JSONObject jsonObj) {
+ Map detailsMap = new HashMap<>();
+ Iterator keys = jsonObj.keys();
+ for (Iterator it = keys; it.hasNext(); ) {
+ String key = it.next();
+ if (jsonObj.get(key) instanceof Boolean) {
+ detailsMap.put(key, jsonObj.getBoolean(key));
+ } else if (jsonObj.get(key) instanceof Integer) {
+ detailsMap.put(key, jsonObj.getInt(key));
+ } else if (jsonObj.get(key) instanceof Long) {
+ detailsMap.put(key, jsonObj.getDouble(key));
+ } else if (jsonObj.get(key) instanceof Double) {
+ detailsMap.put(key, jsonObj.getDouble(key));
+ } else if (jsonObj.get(key) instanceof String) {
+ detailsMap.put(key, jsonObj.getString(key));
+ } else if (jsonObj.get(key) instanceof JSONObject) {
+ try {
+ detailsMap.put(key, Long.parseLong((String) jsonObj.getJSONObject(key)
+ .get(CDCSourceConstants.MONGO_OBJECT_NUMBER_LONG)));
+ } catch (JSONException notLongObjectEx) {
+ try {
+ detailsMap.put(key, Double.parseDouble((String) jsonObj.getJSONObject(key)
+ .get(CDCSourceConstants.MONGO_OBJECT_NUMBER_DECIMAL)));
+ } catch (JSONException notDoubleObjectEx) {
+ if (key.equals(CDCSourceConstants.MONGO_COLLECTION_INSERT_ID)) {
+ detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, jsonObj.getJSONObject(key)
+ .get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID));
+ } else {
+ detailsMap.put(key, jsonObj.getJSONObject(key).toString());
+ }
+ }
+ }
+ }
+ }
+ return detailsMap;
+ }
+}
diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java
new file mode 100644
index 0000000..ad82ddb
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/cdc/source/listening/RdbmsChangeDataCapture.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.siddhi.extension.io.cdc.source.listening;
+
+import io.debezium.data.VariableScaleDecimal;
+import io.siddhi.core.stream.input.source.SourceEventListener;
+import io.siddhi.extension.io.cdc.util.CDCSourceConstants;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.log4j.Logger;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class is for capturing change data for RDBMS using debezium embedded engine.
+ **/
+public class RdbmsChangeDataCapture extends ChangeDataCapture {
+ private static final Logger log = Logger.getLogger(RdbmsChangeDataCapture.class);
+
+ public RdbmsChangeDataCapture(String operation, SourceEventListener sourceEventListener) {
+ super(operation, sourceEventListener);
+ }
+
+ Map createMap(ConnectRecord connectRecord, String operation) {
+ //Map to return
+ Map detailsMap = new HashMap<>();
+ Struct record = (Struct) connectRecord.value();
+ //get the change data object's operation.
+ String op;
+ try {
+ op = (String) record.get(CDCSourceConstants.CONNECT_RECORD_OPERATION);
+ } catch (NullPointerException | DataException ex) {
+ return detailsMap;
+ }
+ //match the change data's operation with user specifying operation and proceed.
+ if (operation.equalsIgnoreCase(CDCSourceConstants.INSERT) &&
+ op.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION)
+ || operation.equalsIgnoreCase(CDCSourceConstants.DELETE) &&
+ op.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION)
+ || operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) &&
+ op.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)) {
+ Struct rawDetails;
+ List fields;
+ String fieldName;
+ switch (op) {
+ case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION:
+ //append row details after insert.
+ rawDetails = (Struct) record.get(CDCSourceConstants.AFTER);
+ fields = rawDetails.schema().fields();
+ for (Field key : fields) {
+ fieldName = key.name();
+ detailsMap.put(fieldName, getValue(rawDetails.get(fieldName)));
+ }
+ break;
+ case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION:
+ //append row details before delete.
+ rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE);
+ fields = rawDetails.schema().fields();
+ for (Field key : fields) {
+ fieldName = key.name();
+ detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName,
+ getValue(rawDetails.get(fieldName)));
+ }
+ break;
+ case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION:
+ //append row details before update.
+ rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE);
+ fields = rawDetails.schema().fields();
+ for (Field key : fields) {
+ fieldName = key.name();
+ detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName,
+ getValue(rawDetails.get(fieldName)));
+ }
+ //append row details after update.
+ rawDetails = (Struct) record.get(CDCSourceConstants.AFTER);
+ fields = rawDetails.schema().fields();
+ for (Field key : fields) {
+ fieldName = key.name();
+ detailsMap.put(fieldName, getValue(rawDetails.get(fieldName)));
+ }
+ break;
+ default:
+ log.info("Provided value for \"op\" : " + op + " is not supported.");
+ break;
+ }
+ }
+ return detailsMap;
+ }
+
+ private Object getValue(Object v) {
+ if (v instanceof Struct) {
+ Optional value = VariableScaleDecimal.toLogical((Struct) v).getDecimalValue();
+ BigDecimal bigDecimal = value.orElse(null);
+ if (bigDecimal == null) {
+ return null;
+ }
+ return bigDecimal.longValue();
+ }
+ if (v instanceof Short) {
+ return ((Short) v).intValue();
+ }
+ if (v instanceof Byte) {
+ return ((Byte) v).intValue();
+ }
+ return v;
+ }
+}
diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java
index 0b31217..27cda83 100644
--- a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java
+++ b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceConstants.java
@@ -52,12 +52,15 @@ public class CDCSourceConstants {
public static final String POSTGRESQL_CONNECTOR_CLASS = "io.debezium.connector.postgresql.PostgresConnector";
public static final String ORACLE_CONNECTOR_CLASS = "io.debezium.connector.oracle.OracleConnector";
public static final String SQLSERVER_CONNECTOR_CLASS = "io.debezium.connector.sqlserver.SqlServerConnector";
+ public static final String MONGODB_CONNECTOR_CLASS = "io.debezium.connector.mongodb.MongoDbConnector";
public static final String BEFORE_PREFIX = "before_";
public static final String CACHE_OBJECT = "cacheObj";
public static final int DEFAULT_SERVER_ID = -1;
+ public static final String CONNECT_RECORD_OPERATION = "op";
public static final String CONNECT_RECORD_INSERT_OPERATION = "c";
public static final String CONNECT_RECORD_UPDATE_OPERATION = "u";
public static final String CONNECT_RECORD_DELETE_OPERATION = "d";
+ public static final String CONNECT_RECORD_INITIAL_SYNC = "r";
public static final String BEFORE = "before";
public static final String AFTER = "after";
public static final String CARBON_HOME = "carbon.home";
@@ -76,4 +79,16 @@ public class CDCSourceConstants {
public static final String WAIT_ON_MISSED_RECORD = "wait.on.missed.record";
public static final String MISSED_RECORD_WAITING_TIMEOUT = "missed.record.waiting.timeout";
public static final String CONNECTOR_NAME = "name";
+ public static final String MONGODB_USER = "mongodb.user";
+ public static final String MONGODB_PASSWORD = "mongodb.password";
+ public static final String MONGODB_HOSTS = "mongodb.hosts";
+ public static final String MONGODB_NAME = "mongodb.name";
+ public static final String MONGODB_COLLECTION_WHITELIST = "collection.whitelist";
+ public static final String MONGO_COLLECTION_OBJECT_ID = "$oid";
+ public static final String MONGO_COLLECTION_ID = "id";
+ public static final String MONGO_COLLECTION_INSERT_ID = "_id";
+ public static final String MONGO_PATCH = "patch";
+ public static final String MONGO_SET = "$set";
+ public static final String MONGO_OBJECT_NUMBER_LONG = "$numberLong";
+ public static final String MONGO_OBJECT_NUMBER_DECIMAL = "$numberDecimal";
}
diff --git a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java
index a924bcd..931e1f1 100644
--- a/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java
+++ b/component/src/main/java/io/siddhi/extension/io/cdc/util/CDCSourceUtil.java
@@ -41,6 +41,7 @@ public static Map getConfigMap(String username, String password,
String host;
int port;
String database;
+ Boolean isMongodb = false;
//Add schema specific details to configMap
String[] splittedURL = url.split(":");
@@ -158,15 +159,48 @@ public static Map getConfigMap(String username, String password,
configMap.put(CDCSourceConstants.CONNECTOR_CLASS, CDCSourceConstants.ORACLE_CONNECTOR_CLASS);
break;
}
+ case "mongodb": {
+ //Extract url details
+ isMongodb = true;
+ String regex = "jdbc:mongodb://(\\w*|(\\w*)/[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}):" +
+ "(\\d++)/(\\w*)";
+ Pattern p = Pattern.compile(regex);
+ Matcher matcher = p.matcher(url);
+ String replicaSetName;
+ if (matcher.find()) {
+ host = matcher.group(1);
+ replicaSetName = matcher.group(2);
+ port = Integer.parseInt(matcher.group(3));
+ database = matcher.group(4);
+ } else {
+ throw new WrongConfigurationException("Invalid MongoDB uri: " + url +
+ " received for stream: " + siddhiStreamName +
+ ". Expected uri format: jdbc:mongodb:///:/" +
+ "");
+ }
+ configMap.put(CDCSourceConstants.CONNECTOR_CLASS, CDCSourceConstants.MONGODB_CONNECTOR_CLASS);
+ //hostname and port pairs of the MongoDB servers in the replica set.
+ configMap.put(CDCSourceConstants.MONGODB_HOSTS, host + ":" + port);
+ //unique name that identifies the connector and/or MongoDB replica set or sharded cluster
+ configMap.put(CDCSourceConstants.MONGODB_NAME, replicaSetName);
+ //fully-qualified namespaces for MongoDB collections to be monitored
+ configMap.put(CDCSourceConstants.MONGODB_COLLECTION_WHITELIST, database + "." + tableName);
+ break;
+ }
default:
throw new WrongConfigurationException("Unsupported schema. Expected schema: mysql or postgresql" +
- "or sqlserver or oracle, Found: " + splittedURL[1]);
+ "or sqlserver, oracle or mongodb Found: " + splittedURL[1]);
}
//Add general config details to configMap
- configMap.put(CDCSourceConstants.DATABASE_USER, username);
- configMap.put(CDCSourceConstants.DATABASE_PASSWORD, password);
+ if (!isMongodb) {
+ configMap.put(CDCSourceConstants.DATABASE_USER, username);
+ configMap.put(CDCSourceConstants.DATABASE_PASSWORD, password);
+ } else {
+ configMap.put(CDCSourceConstants.MONGODB_USER, username);
+ configMap.put(CDCSourceConstants.MONGODB_PASSWORD, password);
+ }
if (serverID == CDCSourceConstants.DEFAULT_SERVER_ID) {
Random random = new Random();
@@ -191,7 +225,7 @@ public static Map getConfigMap(String username, String password,
historyFileDirectory + siddhiStreamName + ".dat");
//set connector property: name
- configMap.put("name", siddhiAppName + siddhiStreamName);
+ configMap.put(CDCSourceConstants.CONNECTOR_NAME, siddhiAppName + siddhiStreamName);
//set additional connector properties using comma separated key value pair string
for (Map.Entry entry : getConnectorPropertiesMap(connectorProperties).entrySet()) {
diff --git a/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java
new file mode 100644
index 0000000..408911f
--- /dev/null
+++ b/component/src/test/java/io/siddhi/extension/io/cdc/source/TestCaseOfCDCListeningModeMongo.java
@@ -0,0 +1,331 @@
+/*
+ * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.siddhi.extension.io.cdc.source;
+
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.query.output.callback.QueryCallback;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.core.util.SiddhiTestHelper;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestCaseOfCDCListeningModeMongo {
+
+ private static final Logger log = Logger.getLogger(TestCaseOfCDCListeningModeMongo.class);
+ private Event currentEvent;
+ private AtomicInteger eventCount = new AtomicInteger(0);
+ private AtomicBoolean eventArrived = new AtomicBoolean(false);
+ private int waitTime = 200;
+ private int timeout = 10000;
+ private String username;
+ private String password;
+ private String databaseUri;
+ private String replicaSetUri;
+ private String collectionName = "SweetProductionTable";
+
+ @BeforeClass
+ public void initializeConnectionParams() {
+ databaseUri = "mongodb://:/";
+ replicaSetUri = "jdbc:mongodb:///:/";
+ username = "user_name";
+ password = "password";
+ }
+
+ @BeforeMethod
+ public void init() {
+ eventCount.set(0);
+ eventArrived.set(false);
+ currentEvent = new Event();
+ }
+
+ /**
+ * Test case to Capture Insert operations from a Mongo table.
+ */
+ @Test
+ public void testInsertCDC() throws InterruptedException {
+ log.info("------------------------------------------------------------------------------------------------");
+ log.info("CDC TestCase: Capturing Insert change data from Mongo.");
+ log.info("------------------------------------------------------------------------------------------------");
+
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String cdcinStreamDefinition = "@app:name('cdcTesting')" +
+ "@source(type = 'cdc'," +
+ " url = '" + replicaSetUri + "'," +
+ " username = '" + username + "'," +
+ " password = '" + password + "'," +
+ " table.name = '" + collectionName + "', " +
+ " operation = 'insert', " +
+ " @map(type='keyvalue'))" +
+ "define stream istm (name string, amount double, volume int);";
+
+ String mongoStoreDefinition = "define stream insertionStream (name string, amount double, volume int);" +
+ "@Store(type='mongodb', mongodb.uri='" + databaseUri + "')" +
+ "define table SweetProductionTable (name string, amount double, volume int);";
+
+ String mongoQuery = "@info(name='query2') " +
+ "from insertionStream " +
+ "insert into SweetProductionTable;";
+
+ SiddhiAppRuntime cdcAppRuntime = siddhiManager.createSiddhiAppRuntime(cdcinStreamDefinition);
+
+ StreamCallback insertionStreamCallback = new StreamCallback() {
+ @Override
+ public void receive(Event[] events) {
+ for (Event event : events) {
+ currentEvent = event;
+ eventCount.getAndIncrement();
+ log.info(eventCount + ". " + event);
+ eventArrived.set(true);
+ }
+ }
+ };
+
+ cdcAppRuntime.addCallback("istm", insertionStreamCallback);
+ cdcAppRuntime.start();
+
+ Thread.sleep(1000);
+
+ QueryCallback mongoQueryCallback = new QueryCallback() {
+ @Override
+ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
+ for (Event event : inEvents) {
+ log.info("insert done: " + event);
+ }
+ }
+ };
+
+ SiddhiAppRuntime mongoAppRuntime = siddhiManager.createSiddhiAppRuntime(mongoStoreDefinition + mongoQuery);
+ mongoAppRuntime.addCallback("query2", mongoQueryCallback);
+ mongoAppRuntime.start();
+
+ //Do an insert and wait for cdc app to capture.
+ InputHandler mongoInputHandler = mongoAppRuntime.getInputHandler("insertionStream");
+ Object[] insertingObject = new Object[]{"e001", 100.00, 5};
+ mongoInputHandler.send(insertingObject);
+ SiddhiTestHelper.waitForEvents(waitTime, 1, eventCount, timeout);
+
+ //Assert event arrival.
+ Assert.assertTrue(eventArrived.get());
+
+ Thread.sleep(1000);
+
+ //Assert event data.
+ Assert.assertEquals(insertingObject, currentEvent.getData());
+
+ cdcAppRuntime.shutdown();
+ mongoAppRuntime.shutdown();
+ siddhiManager.shutdown();
+ }
+
+ /**
+ * Test case to Capture Delete operations from a Mongo table.
+ */
+ @Test
+ public void testDeleteCDC() throws InterruptedException {
+ log.info("------------------------------------------------------------------------------------------------");
+ log.info("CDC TestCase: Capturing Delete change data from Mongo.");
+ log.info("------------------------------------------------------------------------------------------------");
+
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String cdcinStreamDefinition = "@app:name('cdcTesting')" +
+ "@source(type = 'cdc'," +
+ " url = '" + replicaSetUri + "'," +
+ " username = '" + username + "'," +
+ " password = '" + password + "'," +
+ " table.name = '" + collectionName + "', " +
+ " operation = 'delete', " +
+ " @map(type='keyvalue'))" +
+ "define stream delstm (id string);";
+
+
+ String mongoStoreDefinition = "define stream DeletionStream (name string, amount double);" +
+ "define stream InsertStream(name string, amount double);" +
+ "@Store(type='mongodb', mongodb.uri='" + databaseUri + "')" +
+ "define table SweetProductionTable (name string, amount double);";
+
+ String insertQuery = "@info(name='query3') " +
+ "from InsertStream " +
+ "insert into SweetProductionTable;";
+
+ String deleteQuery = "@info(name='queryDel') " +
+ "from DeletionStream " +
+ "delete SweetProductionTable on SweetProductionTable.name==name;";
+
+ SiddhiAppRuntime cdcAppRuntime = siddhiManager.createSiddhiAppRuntime(cdcinStreamDefinition);
+
+ StreamCallback deletionStreamCallback = new StreamCallback() {
+ @Override
+ public void receive(Event[] events) {
+ for (Event event : events) {
+ currentEvent = event;
+ eventCount.getAndIncrement();
+ log.info(eventCount + ". " + event);
+ eventArrived.set(true);
+ }
+ }
+ };
+
+ cdcAppRuntime.addCallback("delstm", deletionStreamCallback);
+ cdcAppRuntime.start();
+
+ QueryCallback mongoQuerycallback = new QueryCallback() {
+ @Override
+ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
+ for (Event event : inEvents) {
+ eventCount.getAndIncrement();
+ log.info("delete done: " + event);
+ }
+ }
+ };
+
+ SiddhiAppRuntime mongoAppRuntime = siddhiManager.createSiddhiAppRuntime(mongoStoreDefinition + insertQuery
+ + deleteQuery);
+ mongoAppRuntime.addCallback("queryDel", mongoQuerycallback);
+ mongoAppRuntime.start();
+
+ //Do an insert first.
+ InputHandler mongoInputHandler = mongoAppRuntime.getInputHandler("InsertStream");
+ Object[] insertingObject = new Object[]{"e001", 100.00};
+ mongoInputHandler.send(insertingObject);
+
+ //wait to complete deletion
+ SiddhiTestHelper.waitForEvents(waitTime, 1, eventCount, timeout);
+ eventCount.getAndDecrement();
+
+ //Delete inserted row
+ mongoInputHandler = mongoAppRuntime.getInputHandler("DeletionStream");
+ Object[] deletingObject = new Object[]{"e001"};
+ mongoInputHandler.send(deletingObject);
+
+ //wait to capture the delete event.
+ SiddhiTestHelper.waitForEvents(waitTime, 1, eventCount, timeout);
+
+ //Assert event arrival.
+ Assert.assertTrue(eventArrived.get());
+
+ cdcAppRuntime.shutdown();
+ mongoAppRuntime.shutdown();
+ siddhiManager.shutdown();
+ }
+
+ /**
+ * Test case to Capture Update operations from a Mongo table.
+ */
+ @Test
+ public void testUpdateCDC() throws InterruptedException {
+ log.info("------------------------------------------------------------------------------------------------");
+ log.info("CDC TestCase: Capturing Update change data from Mongo.");
+ log.info("------------------------------------------------------------------------------------------------");
+
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String cdcinStreamDefinition = "@app:name('cdcTesting')" +
+ "@source(type = 'cdc'," +
+ " url = '" + replicaSetUri + "'," +
+ " username = '" + username + "'," +
+ " password = '" + password + "'," +
+ " table.name = '" + collectionName + "', " +
+ " operation = 'update', " +
+ " @map(type='keyvalue'))" +
+ "define stream updatestm (amount double);";
+
+ String mongoStoreDefinition = "define stream UpdateStream(name string, amount double);" +
+ "define stream InsertStream(name string, amount double);" +
+ "@Store(type='mongodb', mongodb.uri='" + databaseUri + "')" +
+ "define table SweetProductionTable (name string, amount double);";
+
+ String insertQuery = "@info(name='query3') " +
+ "from InsertStream " +
+ "insert into SweetProductionTable;";
+
+ String updateQuery = "@info(name='queryUpdate') " +
+ "from UpdateStream " +
+ "select name, amount " +
+ "update SweetProductionTable " +
+ "set SweetProductionTable.amount = amount " +
+ "on SweetProductionTable.name == name;";
+
+ SiddhiAppRuntime cdcAppRuntime = siddhiManager.createSiddhiAppRuntime(cdcinStreamDefinition);
+
+ StreamCallback updatingStreamCallback = new StreamCallback() {
+ @Override
+ public void receive(Event[] events) {
+ for (Event event : events) {
+ currentEvent = event;
+ eventCount.getAndIncrement();
+ log.info(eventCount + ". " + event);
+ eventArrived.set(true);
+ }
+ }
+ };
+
+ cdcAppRuntime.addCallback("updatestm", updatingStreamCallback);
+ cdcAppRuntime.start();
+
+ QueryCallback queryCallback2 = new QueryCallback() {
+ @Override
+ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
+ for (Event event : inEvents) {
+ log.info("update done: " + event);
+ }
+ }
+ };
+ SiddhiAppRuntime mongoAppRuntime = siddhiManager.createSiddhiAppRuntime(mongoStoreDefinition + insertQuery
+ + updateQuery);
+ mongoAppRuntime.addCallback("queryUpdate", queryCallback2);
+ mongoAppRuntime.start();
+
+ //Do an insert first.
+ InputHandler mongoInputHandler = mongoAppRuntime.getInputHandler("InsertStream");
+ Object[] insertingObject = new Object[]{"sweets", 100.00};
+ mongoInputHandler.send(insertingObject);
+
+ Thread.sleep(1000);
+
+ //Update inserted row.
+ mongoInputHandler = mongoAppRuntime.getInputHandler("UpdateStream");
+ Object[] updatingObject = new Object[]{"sweets", 500.00};
+ mongoInputHandler.send(updatingObject);
+
+ //wait to capture the update event.
+ SiddhiTestHelper.waitForEvents(waitTime, 1, eventCount, timeout);
+
+ //Assert event arrival.
+ Assert.assertTrue(eventArrived.get());
+
+ //Assert event data.
+ Object[] expectedEventObject = new Object[]{500.00};
+ Assert.assertEquals(expectedEventObject, currentEvent.getData());
+
+ cdcAppRuntime.shutdown();
+ mongoAppRuntime.shutdown();
+ siddhiManager.shutdown();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 276706f..55eba47 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,11 @@
debezium-connector-sqlserver
${debezium-connector-sqlserver.version}
+
+ io.debezium
+ debezium-connector-mongodb
+ ${debezium-connector-mongodb.version}
+
com.zaxxer
HikariCP
@@ -100,6 +105,12 @@
${siddhi-store-rdbms.version}
test
+
+ io.siddhi.extension.store.mongodb
+ siddhi-store-mongodb
+ ${siddhi-store-mongodb.version}
+ test
+
org.wso2.carbon.datasources
org.wso2.carbon.datasource.core
@@ -129,6 +140,12 @@
${h2.connector.version}
test
+
+ org.mongodb
+ mongo-java-driver
+ ${mongodb.connector.version}
+ test
+
org.apache.log4j.wso2
log4j
@@ -223,10 +240,12 @@
0.10.0.Final
0.10.0.Final
0.10.0.Final
+ 0.10.0.Final
12.1.0.2
3.2.0
6.11
6.0.1
+ 2.1.0
RELEASE
../findbugs-exclude.xml
12.1.0.1-atlassian-hosted
@@ -243,6 +262,8 @@
0.0.0.0
12.2.0.1
2.3.0
+ 3.4.0
+ 20190722
https://github.com/siddhi-io/siddhi-io-cdc.git