Skip to content

Commit

Permalink
Merge pull request #52 from PrabodDunuwila/master
Browse files Browse the repository at this point in the history
CDC Listening mode support for MongoDB.
  • Loading branch information
dnwick authored Feb 7, 2020
2 parents 1f41c02 + 91bd605 commit bf4ef2a
Show file tree
Hide file tree
Showing 9 changed files with 694 additions and 103 deletions.
17 changes: 16 additions & 1 deletion component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<packaging>bundle</packaging>
<name>Siddhi IO CDC extension</name>
<dependencies>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${json.version}</version>
</dependency>
<dependency>
<groupId>org.apache.log4j.wso2</groupId>
<artifactId>log4j</artifactId>
Expand All @@ -51,6 +56,10 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
</dependency>
<dependency>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-core</artifactId>
Expand Down Expand Up @@ -86,6 +95,11 @@
<artifactId>siddhi-store-rdbms</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.siddhi.extension.store.mongodb</groupId>
<artifactId>siddhi-store-mongodb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
Expand Down Expand Up @@ -562,14 +576,15 @@
org.apache.kafka.connect.*,
org.apache.kafka.common.*,
org.apache.kafka.clients.*,
org.antlr.*,
org.json.*,
</Private-Package>
<Import-Package>
io.siddhi.annotation.*;version="${siddhi.import.version.range}",
io.siddhi.core.*;version="${siddhi.import.version.range}",
io.siddhi.query.*;version="${siddhi.import.version.range}",
*;resolution:=optional
</Import-Package>
<DynamicImport-Package>*</DynamicImport-Package>
<Include-Resource>
META-INF=target/classes/META-INF,
META-INF/services=${project.build.directory}/dependencies/connect-runtime/META-INF/services,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.cdc.source.listening.CDCSourceObjectKeeper;
import io.siddhi.extension.io.cdc.source.listening.ChangeDataCapture;
import io.siddhi.extension.io.cdc.source.listening.MongoChangeDataCapture;
import io.siddhi.extension.io.cdc.source.listening.RdbmsChangeDataCapture;
import io.siddhi.extension.io.cdc.source.listening.WrongConfigurationException;
import io.siddhi.extension.io.cdc.source.polling.CDCPoller;
import io.siddhi.extension.io.cdc.util.CDCSourceConstants;
Expand All @@ -46,6 +48,7 @@
import java.io.File;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -402,7 +405,11 @@ public StateFactory<CdcState> init(SourceEventListener sourceEventListener, Opti
validateListeningModeParameters(optionHolder);

//send sourceEventListener and preferred operation to changeDataCapture object
changeDataCapture = new ChangeDataCapture(operation, sourceEventListener);
if (url.toLowerCase(Locale.ENGLISH).contains("jdbc:mongodb")) {
changeDataCapture = new MongoChangeDataCapture(operation, sourceEventListener);
} else {
changeDataCapture = new RdbmsChangeDataCapture(operation, sourceEventListener);
}

//create the folder for history file if not exists
File directory = new File(historyFileDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,20 @@
package io.siddhi.extension.io.cdc.source.listening;

import io.debezium.config.Configuration;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.spi.OffsetCommitPolicy;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.cdc.util.CDCSourceConstants;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* This class is for capturing change data using debezium embedded engine.
**/
public class ChangeDataCapture {

public abstract class ChangeDataCapture {
private String operation;
private Configuration config;
private SourceEventListener sourceEventListener;
Expand Down Expand Up @@ -131,91 +121,6 @@ private void handleEvent(ConnectRecord connectRecord) {
* @param operation is the change data event which is specified by the user.
**/

private Map<String, Object> createMap(ConnectRecord connectRecord, String operation) {

//Map to return
Map<String, Object> detailsMap = new HashMap<>();

Struct record = (Struct) connectRecord.value();

//get the change data object's operation.
String op;

try {
op = (String) record.get("op");
} catch (NullPointerException | DataException ex) {
return detailsMap;
}

//match the change data's operation with user specifying operation and proceed.
if (operation.equalsIgnoreCase(CDCSourceConstants.INSERT) &&
op.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION)
|| operation.equalsIgnoreCase(CDCSourceConstants.DELETE) &&
op.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION)
|| operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) &&
op.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)) {

Struct rawDetails;
List<Field> fields;
String fieldName;
abstract Map<String, Object> createMap(ConnectRecord connectRecord, String operation);

switch (op) {
case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION:
//append row details after insert.
rawDetails = (Struct) record.get(CDCSourceConstants.AFTER);
fields = rawDetails.schema().fields();
for (Field key : fields) {
fieldName = key.name();
detailsMap.put(fieldName, getValue(rawDetails.get(fieldName)));
}
break;
case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION:
//append row details before delete.
rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE);
fields = rawDetails.schema().fields();
for (Field key : fields) {
fieldName = key.name();
detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName,
getValue(rawDetails.get(fieldName)));
}
break;
case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION:
//append row details before update.
rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE);
fields = rawDetails.schema().fields();
for (Field key : fields) {
fieldName = key.name();
detailsMap.put(CDCSourceConstants.BEFORE_PREFIX + fieldName,
getValue(rawDetails.get(fieldName)));
}
//append row details after update.
rawDetails = (Struct) record.get(CDCSourceConstants.AFTER);
fields = rawDetails.schema().fields();
for (Field key : fields) {
fieldName = key.name();
detailsMap.put(fieldName, getValue(rawDetails.get(fieldName)));
}
break;
}
}
return detailsMap;
}

private Object getValue(Object v) {
if (v instanceof Struct) {
Optional<BigDecimal> value = VariableScaleDecimal.toLogical((Struct) v).getDecimalValue();
BigDecimal bigDecimal = value.orElse(null);
if (bigDecimal == null) {
return null;
}
return bigDecimal.longValue();
}
if (v instanceof Short) {
return ((Short) v).intValue();
}
if (v instanceof Byte) {
return ((Byte) v).intValue();
}
return v;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.siddhi.extension.io.cdc.source.listening;

import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.cdc.util.CDCSourceConstants;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.log4j.Logger;
import org.json.JSONException;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
* This class is for capturing change data for MongoDB using debezium embedded engine.
**/
public class MongoChangeDataCapture extends ChangeDataCapture {
private static final Logger log = Logger.getLogger(MongoChangeDataCapture.class);

public MongoChangeDataCapture(String operation, SourceEventListener sourceEventListener) {
super(operation, sourceEventListener);
}

Map<String, Object> createMap(ConnectRecord connectRecord, String operation) {
//Map to return
Map<String, Object> detailsMap = new HashMap<>();
Struct record = (Struct) connectRecord.value();
//get the change data object's operation.
String op;
try {
op = (String) record.get(CDCSourceConstants.CONNECT_RECORD_OPERATION);
} catch (NullPointerException | DataException ex) {
return detailsMap;
}
//match the change data's operation with user specifying operation and proceed.
if (operation.equalsIgnoreCase(CDCSourceConstants.INSERT) &&
op.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION)
|| operation.equalsIgnoreCase(CDCSourceConstants.DELETE) &&
op.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION)
|| operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) &&
op.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)) {
switch (op) {
case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION:
//append row details after insert.
String insertString = (String) record.get(CDCSourceConstants.AFTER);
JSONObject jsonObj = new JSONObject(insertString);
detailsMap = getMongoDetailMap(jsonObj);
break;
case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION:
//append row details before delete.
String deleteDocumentId = (String) ((Struct) connectRecord.key())
.get(CDCSourceConstants.MONGO_COLLECTION_ID);
JSONObject jsonObjId = new JSONObject(deleteDocumentId);
detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID,
jsonObjId.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID));

break;
case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION:
//append row details before update.
String updateDocument = (String) record.get(CDCSourceConstants.MONGO_PATCH);
JSONObject jsonObj1 = new JSONObject(updateDocument);
JSONObject setJsonObj = (JSONObject) jsonObj1.get(CDCSourceConstants.MONGO_SET);
detailsMap = getMongoDetailMap(setJsonObj);
String updateDocumentId = (String) ((Struct) connectRecord.key())
.get(CDCSourceConstants.MONGO_COLLECTION_ID);
JSONObject jsonObjId1 = new JSONObject(updateDocumentId);
detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID,
jsonObjId1.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID));
break;
default:
log.info("Provided value for \"op\" : " + op + " is not supported.");
break;
}
}
return detailsMap;
}

private Map<String, Object> getMongoDetailMap(JSONObject jsonObj) {
Map<String, Object> detailsMap = new HashMap<>();
Iterator<String> keys = jsonObj.keys();
for (Iterator<String> it = keys; it.hasNext(); ) {
String key = it.next();
if (jsonObj.get(key) instanceof Boolean) {
detailsMap.put(key, jsonObj.getBoolean(key));
} else if (jsonObj.get(key) instanceof Integer) {
detailsMap.put(key, jsonObj.getInt(key));
} else if (jsonObj.get(key) instanceof Long) {
detailsMap.put(key, jsonObj.getDouble(key));
} else if (jsonObj.get(key) instanceof Double) {
detailsMap.put(key, jsonObj.getDouble(key));
} else if (jsonObj.get(key) instanceof String) {
detailsMap.put(key, jsonObj.getString(key));
} else if (jsonObj.get(key) instanceof JSONObject) {
try {
detailsMap.put(key, Long.parseLong((String) jsonObj.getJSONObject(key)
.get(CDCSourceConstants.MONGO_OBJECT_NUMBER_LONG)));
} catch (JSONException notLongObjectEx) {
try {
detailsMap.put(key, Double.parseDouble((String) jsonObj.getJSONObject(key)
.get(CDCSourceConstants.MONGO_OBJECT_NUMBER_DECIMAL)));
} catch (JSONException notDoubleObjectEx) {
if (key.equals(CDCSourceConstants.MONGO_COLLECTION_INSERT_ID)) {
detailsMap.put(CDCSourceConstants.MONGO_COLLECTION_ID, jsonObj.getJSONObject(key)
.get(CDCSourceConstants.MONGO_COLLECTION_OBJECT_ID));
} else {
detailsMap.put(key, jsonObj.getJSONObject(key).toString());
}
}
}
}
}
return detailsMap;
}
}
Loading

0 comments on commit bf4ef2a

Please sign in to comment.