diff --git a/README.md b/README.md
index 2131da0..480676a 100644
--- a/README.md
+++ b/README.md
@@ -51,8 +51,8 @@ ReplicaDB is written in Java and requires a Java Runtime Environment (JRE) Stand
Just download [latest](https://github.com/osalvador/ReplicaDB/releases) release and unzip it.
-$ curl -o ReplicaDB-0.12.1.tar.gz -L "https://github.com/osalvador/ReplicaDB/releases/download/v0.12.1/ReplicaDB-0.12.1.tar.gz"
-$ tar -xvzf ReplicaDB-0.12.1.tar.gz
+$ curl -o ReplicaDB-0.12.2.tar.gz -L "https://github.com/osalvador/ReplicaDB/releases/download/v0.12.2/ReplicaDB-0.12.2.tar.gz"
+$ tar -xvzf ReplicaDB-0.12.2.tar.gz
$ ./bin/replicadb --help
diff --git a/pom.xml b/pom.xml
index 5cfb172..07bbea4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
- 0.12.1
+ 0.12.2
diff --git a/src/main/java/org/replicadb/ReplicaTask.java b/src/main/java/org/replicadb/ReplicaTask.java
index a81198a..f0b81da 100644
--- a/src/main/java/org/replicadb/ReplicaTask.java
+++ b/src/main/java/org/replicadb/ReplicaTask.java
@@ -8,16 +8,16 @@
import org.replicadb.manager.ManagerFactory;
import java.sql.ResultSet;
-import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
-final public class ReplicaTask implements Callable {
+public final class ReplicaTask implements Callable {
private static final Logger LOG = LogManager.getLogger(ReplicaTask.class.getName());
- private int taskId;
- private String taskName;
- private ToolOptions options;
+ private final int taskId;
+ private final ToolOptions options;
public ReplicaTask(int id, ToolOptions options) {
@@ -28,17 +28,12 @@ public ReplicaTask(int id, ToolOptions options) {
public Integer call() throws Exception {
- //System.out.println("Task ID :" + this.taskId + " performed by " + Thread.currentThread().getName());
- this.taskName = "TaskId-"+this.taskId;
+ String taskName = "TaskId-" + this.taskId;
- LOG.info("Starting " + Thread.currentThread().getName());
+ LOG.info("Starting {}", Thread.currentThread().getName());
- // Do stuff...
- // Obtener una instancia del DriverManager del Source
- // Obtener una instancia del DriverManager del Sink
- // Mover datos de Source a Sink.
ManagerFactory managerF = new ManagerFactory();
ConnManager sourceDs = managerF.accept(options, DataSourceType.SOURCE);
ConnManager sinkDs = managerF.accept(options, DataSourceType.SINK);
@@ -47,14 +42,14 @@ public Integer call() throws Exception {
try {
} catch (Exception e) {
- LOG.error("ERROR in " + this.taskName+ " getting Source connection: " + e.getMessage());
+ LOG.error("ERROR in {} getting Source connection: {} ", taskName, e.getMessage());
throw e;
try {
} catch (Exception e) {
- LOG.error("ERROR in " + this.taskName + " getting Sink connection: " + e.getMessage());
+ LOG.error("ERROR in {} getting Sink connection:{} ", taskName,e.getMessage());
throw e;
@@ -62,7 +57,7 @@ public Integer call() throws Exception {
try {
rs = sourceDs.readTable(null, null, taskId);
} catch (Exception e) {
- LOG.error("ERROR in " + this.taskName + " reading source table: " + e.getMessage());
+ LOG.error("ERROR in {} reading source table: {}", taskName, e.getMessage());
throw e;
@@ -71,7 +66,7 @@ public Integer call() throws Exception {
// TODO determine the total rows processed in all the managers
LOG.info("A total of {} rows processed by task {}", processedRows, taskId);
} catch (Exception e) {
- LOG.error("ERROR in " + this.taskName + " inserting data to sink table: " + e.getMessage());
+ LOG.error("ERROR in {} inserting data to sink table: {} ", taskName, getExceptionMessageChain(e));
throw e;
@@ -83,5 +78,14 @@ public Integer call() throws Exception {
return this.taskId;
+ public static List getExceptionMessageChain(Throwable throwable) {
+ List result = new ArrayList<>();
+ while (throwable != null) {
+ result.add(throwable.getMessage());
+ throwable = throwable.getCause();
+ }
+ return result;
+ }
diff --git a/src/main/java/org/replicadb/manager/SQLServerManager.java b/src/main/java/org/replicadb/manager/SQLServerManager.java
index 3c54255..24a082b 100644
--- a/src/main/java/org/replicadb/manager/SQLServerManager.java
+++ b/src/main/java/org/replicadb/manager/SQLServerManager.java
@@ -2,12 +2,12 @@
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopy;
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopyOptions;
-import com.microsoft.sqlserver.jdbc.SQLServerException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.replicadb.cli.ReplicationMode;
import org.replicadb.cli.ToolOptions;
+import javax.sql.RowSet;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -16,245 +16,248 @@
public class SQLServerManager extends SqlManager {
- private static final Logger LOG = LogManager.getLogger(SQLServerManager.class.getName());
-// private static Long chunkSize;
- /**
- * Constructs the SqlManager.
- *
- * @param opts the ReplicaDB ToolOptions describing the user's requested action.
- */
- public SQLServerManager(ToolOptions opts, DataSourceType dsType) {
- super(opts);
- this.dsType = dsType;
- }
- @Override
- public String getDriverClass() {
- return JdbcDrivers.SQLSERVER.getDriverClass();
- }
- private void setIdentityInsert(String tableName, Boolean flag) throws SQLException {
- String valueToSetIdentity = "";
- if (flag) valueToSetIdentity = "ON";
- else valueToSetIdentity = "OFF";
- Statement stmt = this.getConnection().createStatement();
- String sqlCommand = "IF OBJECTPROPERTY(OBJECT_ID('" + tableName + "'), 'TableHasIdentity') = 1 " +
- "SET IDENTITY_INSERT " + tableName + " " + valueToSetIdentity;
- LOG.info(sqlCommand);
- stmt.executeUpdate(sqlCommand);
- stmt.close();
- }
- @Override
- public int insertDataToTable(ResultSet resultSet, int taskId) throws SQLException {
- String tableName;
- // Get table name and columns
- if (options.getMode().equals(ReplicationMode.COMPLETE.getModeText())) {
- tableName = getSinkTableName();
- } else {
- tableName = getQualifiedStagingTableName();
- }
- ResultSetMetaData rsmd = resultSet.getMetaData();
- int columnCount = rsmd.getColumnCount();
- SQLServerBulkCopy bulkCopy = null;
- try {
- bulkCopy = new SQLServerBulkCopy(this.getConnection());
- // BulkCopy Options
- SQLServerBulkCopyOptions copyOptions = new SQLServerBulkCopyOptions();
- copyOptions.setBulkCopyTimeout(0);
- bulkCopy.setBulkCopyOptions(copyOptions);
- bulkCopy.setDestinationTableName(tableName);
- // Columns Mapping
- if (this.options.getSinkColumns() != null && !this.options.getSinkColumns().isEmpty()) {
- String sinkColumns = getAllSinkColumns(rsmd);
- // Remove quotes from column names, which are not supported by SQLServerBulkCopy
- String[] sinkColumnsArray = sinkColumns.replace("\"", "").split(",");
- LOG.trace("Mapping columns: source --> sink");
- for (int i = 1; i <= sinkColumnsArray.length; i++) {
- bulkCopy.addColumnMapping(rsmd.getColumnName(i), sinkColumnsArray[i - 1]);
- LOG.trace("{} --> {}", rsmd.getColumnName(i), sinkColumnsArray[i - 1]);
- }
- } else {
- for (int i = 1; i <= columnCount; i++) {
- LOG.trace("source {} - {} sink", rsmd.getColumnName(i), i);
- bulkCopy.addColumnMapping(rsmd.getColumnName(i), i);
- }
- }
- LOG.info("Performing BulkCopy into {} ", tableName);
- // Write from the source to the destination.
- bulkCopy.writeToServer(resultSet);
- } catch (SQLServerException e) {
- throw e;
- }
- bulkCopy.close();
- // TODO: getAllSinkColumns should not update the sinkColumns property. Change it in Oracle and check it in Postgres
- // Set Sink columns
- getAllSinkColumns(rsmd);
- this.getConnection().commit();
- return 0;
- }
- @Override
- protected void createStagingTable() throws SQLException {
- Statement statement = this.getConnection().createStatement();
- String sinkStagingTable = getQualifiedStagingTableName();
- // Get sink columns.
- String allSinkColumns = null;
- if (this.options.getSinkColumns() != null && !this.options.getSinkColumns().isEmpty()) {
- allSinkColumns = this.options.getSinkColumns();
- } else if (this.options.getSourceColumns() != null && !this.options.getSourceColumns().isEmpty()) {
- allSinkColumns = this.options.getSourceColumns();
- } else {
- allSinkColumns = "*";
- }
- String sql = " SELECT " + allSinkColumns + " INTO " + sinkStagingTable + " FROM " + this.getSinkTableName() + " WHERE 0 = 1 ";
- LOG.info("Creating staging table with this command: " + sql);
- statement.executeUpdate(sql);
- statement.close();
- this.getConnection().commit();
- }
- @Override
- protected void mergeStagingTable() throws SQLException {
- this.getConnection().commit();
- Statement statement = this.getConnection().createStatement();
- String[] pks = this.getSinkPrimaryKeys(this.getSinkTableName());
- // Primary key is required
- if (pks == null || pks.length == 0) {
- throw new IllegalArgumentException("Sink table must have at least one primary key column for incremental mode.");
- }
- // options.sinkColumns was set during the insertDataToTable
- String allColls = getAllSinkColumns(null);
- StringBuilder sql = new StringBuilder();
- sql.append("MERGE INTO ")
- .append(this.getSinkTableName())
- .append(" trg USING (SELECT ")
- .append(allColls)
- .append(" FROM ")
- .append(getQualifiedStagingTableName())
- .append(" ) src ON ")
- .append(" (");
- for (int i = 0; i <= pks.length - 1; i++) {
- if (i >= 1) sql.append(" AND ");
- sql.append("src.").append(pks[i]).append("= trg.").append(pks[i]);
- }
- sql.append(" ) WHEN MATCHED THEN UPDATE SET ");
- LOG.trace("allColls: {} \n pks: {}", allColls, pks);
- // Set all columns for UPDATE SET statement
- for (String colName : allColls.split("\\s*,\\s*")) {
- LOG.trace("colName: {}", colName);
- boolean contains = Arrays.asList(pks).contains(colName);
- boolean containsQuoted = Arrays.asList(pks).contains("\"" + colName + "\"");
- if (!contains && !containsQuoted)
- sql.append(" trg.").append(colName).append(" = src.").append(colName).append(" ,");
- }
- // Delete the last comma
- sql.setLength(sql.length() - 1);
- sql.append(" WHEN NOT MATCHED THEN INSERT ( ").append(allColls).
- append(" ) VALUES (");
- // all columns for INSERT VALUES statement
- for (String colName : allColls.split("\\s*,\\s*")) {
- sql.append(" src.").append(colName).append(" ,");
- }
- // Delete the last comma
- sql.setLength(sql.length() - 1);
- sql.append(" ); ");
- LOG.info("Merging staging table and sink table with this command: {}",sql);
- statement.executeUpdate(sql.toString());
- statement.close();
- this.getConnection().commit();
- }
- @Override
- public ResultSet readTable(String tableName, String[] columns, int nThread) throws SQLException {
- // If table name parameter is null get it from options
- tableName = tableName == null ? this.options.getSourceTable() : tableName;
- // If columns parameter is null, get it from options
- String allColumns = this.options.getSourceColumns() == null ? "*" : this.options.getSourceColumns();
- String sqlCmd;
- // Read table with source-query option specified
- if (options.getSourceQuery() != null && !options.getSourceQuery().isEmpty()) {
- if (options.getJobs() == 1)
- sqlCmd = "SELECT * FROM (" +
- options.getSourceQuery() + ") as REPDBQUERY where 0 = ?";
- else
- throw new UnsupportedOperationException("ReplicaDB on SQLServer still not support custom query parallel process. Use properties instead: source.table, source.columns and source.where ");
- }
- // Read table with source-where option specified
- else if (options.getSourceWhere() != null && !options.getSourceWhere().isEmpty()) {
- sqlCmd = "SELECT " +
- allColumns +
- " FROM " +
- escapeTableName(tableName);
- if (options.getJobs() == 1)
- sqlCmd = sqlCmd + " where " + options.getSourceWhere() + " AND 0 = ?";
- else
- sqlCmd = sqlCmd + " where " + options.getSourceWhere() + " ABS(CHECKSUM(%% physloc %%)) % " + (options.getJobs()) + " = ?";
- } else {
- // Full table read. Force NO_IDEX and table scan
- sqlCmd = "SELECT " +
- allColumns +
- " FROM " +
- escapeTableName(tableName);
- if (options.getJobs() == 1)
- sqlCmd = sqlCmd + " where 0 = ?";
- else
- sqlCmd = sqlCmd + " where ABS(CHECKSUM(%% physloc %%)) % " + (options.getJobs()) + " = ?";
- }
- return super.execute(sqlCmd, (Object) nThread);
- }
- @Override
- public void preSourceTasks() {/*Not implemented*/}
- @Override
- public void postSourceTasks() {/*Not implemented*/}
- @Override
- public void postSinkTasks() throws Exception {
- setIdentityInsert(getSinkTableName(), true);
- super.postSinkTasks();
- setIdentityInsert(getSinkTableName(), false);
- }
+ private static final Logger LOG = LogManager.getLogger(SQLServerManager.class.getName());
+ /**
+ * Constructs the SqlManager.
+ *
+ * @param opts the ReplicaDB ToolOptions describing the user's requested action.
+ */
+ public SQLServerManager (ToolOptions opts, DataSourceType dsType) {
+ super(opts);
+ this.dsType = dsType;
+ }
+ @Override
+ public String getDriverClass () {
+ return JdbcDrivers.SQLSERVER.getDriverClass();
+ }
+ private void setIdentityInsert (String tableName, Boolean isSetIdentityOn) throws SQLException {
+ String valueToSetIdentity;
+ if (Boolean.TRUE.equals(isSetIdentityOn)) valueToSetIdentity = "ON";
+ else valueToSetIdentity = "OFF";
+ Statement stmt = this.getConnection().createStatement();
+ String sqlCommand = "IF OBJECTPROPERTY(OBJECT_ID('" +
+ tableName +
+ "'), 'TableHasIdentity') = 1 " +
+ tableName +
+ " " +
+ valueToSetIdentity;
+ LOG.info(sqlCommand);
+ stmt.executeUpdate(sqlCommand);
+ stmt.close();
+ }
+ @Override
+ public int insertDataToTable (ResultSet resultSet, int taskId) throws SQLException {
+ String tableName;
+ // Get table name and columns
+ if (options.getMode().equals(ReplicationMode.COMPLETE.getModeText())) {
+ tableName = getSinkTableName();
+ } else {
+ tableName = getQualifiedStagingTableName();
+ }
+ ResultSetMetaData rsmd = resultSet.getMetaData();
+ int columnCount = rsmd.getColumnCount();
+ SQLServerBulkCopy bulkCopy = new SQLServerBulkCopy(this.getConnection());
+ // BulkCopy Options
+ SQLServerBulkCopyOptions copyOptions = new SQLServerBulkCopyOptions();
+ copyOptions.setBulkCopyTimeout(0);
+ bulkCopy.setBulkCopyOptions(copyOptions);
+ bulkCopy.setDestinationTableName(tableName);
+ // Columns Mapping
+ if (this.options.getSinkColumns() != null && !this.options.getSinkColumns().isEmpty()) {
+ String sinkColumns = getAllSinkColumns(rsmd);
+ // Remove quotes from column names, which are not supported by SQLServerBulkCopy
+ String[] sinkColumnsArray = sinkColumns.replace("\"", "").split(",");
+ LOG.trace("Mapping columns: source --> sink");
+ for (int i = 1; i <= sinkColumnsArray.length; i++) {
+ bulkCopy.addColumnMapping(rsmd.getColumnName(i), sinkColumnsArray[i - 1]);
+ LOG.trace("{} --> {}", rsmd.getColumnName(i), sinkColumnsArray[i - 1]);
+ }
+ } else {
+ for (int i = 1; i <= columnCount; i++) {
+ LOG.trace("source {} - {} sink", rsmd.getColumnName(i), i);
+ bulkCopy.addColumnMapping(rsmd.getColumnName(i), i);
+ }
+ }
+ LOG.info("Performing BulkCopy into {} ", tableName);
+ // Write from the source to the destination.
+ //If the source ResulSet is an implementation of RowSet (e.g. csv file) cast it.
+ if (resultSet.getClass().getPackage().getName().equals("org.replicadb.rowset"))
+ bulkCopy.writeToServer((RowSet) resultSet);
+ else
+ bulkCopy.writeToServer(resultSet);
+ bulkCopy.close();
+ // TODO: getAllSinkColumns should not update the sinkColumns property. Change it in Oracle and check it in Postgres
+ // Set Sink columns
+ getAllSinkColumns(rsmd);
+ this.getConnection().commit();
+ return 0;
+ }
+ @Override
+ protected void createStagingTable () throws SQLException {
+ Statement statement = this.getConnection().createStatement();
+ String sinkStagingTable = getQualifiedStagingTableName();
+ // Get sink columns.
+ String allSinkColumns = null;
+ if (this.options.getSinkColumns() != null && !this.options.getSinkColumns().isEmpty()) {
+ allSinkColumns = this.options.getSinkColumns();
+ } else if (this.options.getSourceColumns() != null && !this.options.getSourceColumns().isEmpty()) {
+ allSinkColumns = this.options.getSourceColumns();
+ } else {
+ allSinkColumns = "*";
+ }
+ String sql = " SELECT " + allSinkColumns + " INTO " + sinkStagingTable + " FROM " + this.getSinkTableName() + " WHERE 0 = 1 ";
+ LOG.info("Creating staging table with this command: {}", sql);
+ statement.executeUpdate(sql);
+ statement.close();
+ this.getConnection().commit();
+ }
+ @Override
+ protected void mergeStagingTable () throws SQLException {
+ this.getConnection().commit();
+ Statement statement = this.getConnection().createStatement();
+ String[] pks = this.getSinkPrimaryKeys(this.getSinkTableName());
+ // Primary key is required
+ if (pks == null || pks.length == 0) {
+ throw new IllegalArgumentException("Sink table must have at least one primary key column for incremental mode.");
+ }
+ // options.sinkColumns was set during the insertDataToTable
+ String allColls = getAllSinkColumns(null);
+ StringBuilder sql = new StringBuilder();
+ sql.append("MERGE INTO ")
+ .append(this.getSinkTableName())
+ .append(" trg USING (SELECT ")
+ .append(allColls)
+ .append(" FROM ")
+ .append(getQualifiedStagingTableName())
+ .append(" ) src ON ")
+ .append(" (");
+ for (int i = 0; i <= pks.length - 1; i++) {
+ if (i >= 1) sql.append(" AND ");
+ sql.append("src.").append(pks[i]).append("= trg.").append(pks[i]);
+ }
+ sql.append(" ) WHEN MATCHED THEN UPDATE SET ");
+ LOG.trace("allColls: {} \n pks: {}", allColls, pks);
+ // Set all columns for UPDATE SET statement
+ for (String colName : allColls.split("\\s*,\\s*")) {
+ LOG.trace("colName: {}", colName);
+ boolean contains = Arrays.asList(pks).contains(colName);
+ boolean containsQuoted = Arrays.asList(pks).contains("\"" + colName + "\"");
+ if (!contains && !containsQuoted)
+ sql.append(" trg.").append(colName).append(" = src.").append(colName).append(" ,");
+ }
+ // Delete the last comma
+ sql.setLength(sql.length() - 1);
+ sql.append(" WHEN NOT MATCHED THEN INSERT ( ").append(allColls).
+ append(" ) VALUES (");
+ // all columns for INSERT VALUES statement
+ for (String colName : allColls.split("\\s*,\\s*")) {
+ sql.append(" src.").append(colName).append(" ,");
+ }
+ // Delete the last comma
+ sql.setLength(sql.length() - 1);
+ sql.append(" ); ");
+ LOG.info("Merging staging table and sink table with this command: {}", sql);
+ statement.executeUpdate(sql.toString());
+ statement.close();
+ this.getConnection().commit();
+ }
+ @Override
+ public ResultSet readTable (String tableName, String[] columns, int nThread) throws SQLException {
+ // If table name parameter is null get it from options
+ tableName = tableName == null ? this.options.getSourceTable() : tableName;
+ // If columns parameter is null, get it from options
+ String allColumns = this.options.getSourceColumns() == null ? "*" : this.options.getSourceColumns();
+ String sqlCmd;
+ // Read table with source-query option specified
+ if (options.getSourceQuery() != null && !options.getSourceQuery().isEmpty()) {
+ if (options.getJobs() == 1)
+ sqlCmd = "SELECT * FROM (" +
+ options.getSourceQuery() + ") as REPDBQUERY where 0 = ?";
+ else
+ throw new UnsupportedOperationException("ReplicaDB on SQLServer still not support custom query parallel process. Use properties instead: source.table, source.columns and source.where ");
+ }
+ // Read table with source-where option specified
+ else if (options.getSourceWhere() != null && !options.getSourceWhere().isEmpty()) {
+ sqlCmd = "SELECT " +
+ allColumns +
+ " FROM " +
+ escapeTableName(tableName);
+ if (options.getJobs() == 1)
+ sqlCmd = sqlCmd + " where " + options.getSourceWhere() + " AND 0 = ?";
+ else
+ sqlCmd = sqlCmd + " where " + options.getSourceWhere() + " ABS(CHECKSUM(%% physloc %%)) % " + (options.getJobs()) + " = ?";
+ } else {
+ // Full table read. Force NO_IDEX and table scan
+ sqlCmd = "SELECT " +
+ allColumns +
+ " FROM " +
+ escapeTableName(tableName);
+ if (options.getJobs() == 1)
+ sqlCmd = sqlCmd + " where 0 = ?";
+ else
+ sqlCmd = sqlCmd + " where ABS(CHECKSUM(%% physloc %%)) % " + (options.getJobs()) + " = ?";
+ }
+ return super.execute(sqlCmd, (Object) nThread);
+ }
+ @Override
+ public void preSourceTasks () {/*Not implemented*/}
+ @Override
+ public void postSourceTasks () {/*Not implemented*/}
+ @Override
+ public void postSinkTasks () throws Exception {
+ setIdentityInsert(getSinkTableName(), true);
+ super.postSinkTasks();
+ setIdentityInsert(getSinkTableName(), false);
+ }
diff --git a/src/main/java/org/replicadb/manager/file/CsvFileManager.java b/src/main/java/org/replicadb/manager/file/CsvFileManager.java
index 1c27699..dac9d1a 100644
--- a/src/main/java/org/replicadb/manager/file/CsvFileManager.java
+++ b/src/main/java/org/replicadb/manager/file/CsvFileManager.java
@@ -217,6 +217,7 @@ public void init() throws SQLException {
throw new IllegalArgumentException("Parameter 'source.connect.parameter.columns.types' cannot be null");
+ csvResultset.setColumnsNames(options.getSourceColumns());
diff --git a/src/main/java/org/replicadb/rowset/CsvCachedRowSetImpl.java b/src/main/java/org/replicadb/rowset/CsvCachedRowSetImpl.java
index c8ee01f..cee39cc 100644
--- a/src/main/java/org/replicadb/rowset/CsvCachedRowSetImpl.java
+++ b/src/main/java/org/replicadb/rowset/CsvCachedRowSetImpl.java
@@ -14,258 +14,273 @@
import java.sql.*;
public class CsvCachedRowSetImpl extends CachedRowSetImpl {
- private static final Logger LOG = LogManager.getLogger(CsvCachedRowSetImpl.class.getName());
+ private static final Logger LOG = LogManager.getLogger(CsvCachedRowSetImpl.class.getName());
- private File sourceFile;
- private Iterable records;
- private String[] columnsTypes;
- private CSVFormat csvFormat;
- private static int lineNumer = 0;
+ private File sourceFile;
+ private Iterable records;
+ private String[] columnsTypes;
+ private String[] columnName;
+ private CSVFormat csvFormat;
+ private static int lineNumer = 0;
- public void setCsvFormat(CSVFormat csvFormat) {
- this.csvFormat = csvFormat;
- }
+ public void setCsvFormat (CSVFormat csvFormat) {
+ this.csvFormat = csvFormat;
+ }
- public CsvCachedRowSetImpl() throws SQLException {}
+ public CsvCachedRowSetImpl () throws SQLException {
+ }
- public void setSourceFile(File sourceFile) {
- this.sourceFile = sourceFile;
- }
+ public void setSourceFile (File sourceFile) {
+ this.sourceFile = sourceFile;
+ }
- public void setColumnsTypes(String columnsTypes) {
- this.columnsTypes = columnsTypes.trim().replace(" ", "").toUpperCase().split(",");
- }
+ public void setColumnsTypes (String columnsTypes) {
+ this.columnsTypes = columnsTypes.trim().replace(" ", "").toUpperCase().split(",");
+ }
- @Override
- public void execute() throws SQLException {
+ public void setColumnsNames (String columnsNames) {
+ if (columnsNames == null) return;
+ this.columnName = columnsNames.trim().replace(" ", "").toUpperCase().split(",");
+ }
- RowSetMetaData rsmd = new RowSetMetaDataImpl();
- rsmd.setColumnCount(this.columnsTypes.length);
- for (int i = 0; i <= this.columnsTypes.length - 1; i++) {
- switch (this.columnsTypes[i]) {
- case "VARCHAR":
- rsmd.setColumnType(i + 1, Types.VARCHAR);
- break;
- case "CHAR":
- rsmd.setColumnType(i + 1, Types.CHAR);
- break;
- rsmd.setColumnType(i + 1, Types.LONGVARCHAR);
- break;
- case "INTEGER":
- rsmd.setColumnType(i + 1, Types.INTEGER);
- break;
- case "BIGINT":
- rsmd.setColumnType(i + 1, Types.BIGINT);
- break;
- case "TINYINT":
- rsmd.setColumnType(i + 1, Types.TINYINT);
- break;
- case "SMALLINT":
- rsmd.setColumnType(i + 1, Types.SMALLINT);
- break;
- case "NUMERIC":
- rsmd.setColumnType(i + 1, Types.NUMERIC);
- break;
- case "DECIMAL":
- rsmd.setColumnType(i + 1, Types.DECIMAL);
- break;
- case "DOUBLE":
- rsmd.setColumnType(i + 1, Types.DOUBLE);
- break;
- case "FLOAT":
- rsmd.setColumnType(i + 1, Types.FLOAT);
- break;
- case "DATE":
- rsmd.setColumnType(i + 1, Types.DATE);
- break;
- case "TIMESTAMP":
- rsmd.setColumnType(i + 1, Types.TIMESTAMP);
- break;
- case "TIME":
- rsmd.setColumnType(i + 1, Types.TIME);
- break;
- case "BOOLEAN":
- rsmd.setColumnType(i + 1, Types.BOOLEAN);
- break;
- default:
- rsmd.setColumnType(i + 1, Types.VARCHAR);
- break;
+ @Override
+ public void execute () throws SQLException {
+ RowSetMetaData rsmd = new RowSetMetaDataImpl();
+ rsmd.setColumnCount(this.columnsTypes.length);
+ for (int i = 0; i <= this.columnsTypes.length - 1; i++) {
+ if (columnName != null) rsmd.setColumnName(i + 1, columnName[i]);
+ switch (this.columnsTypes[i]) {
+ case "VARCHAR":
+ rsmd.setColumnType(i + 1, Types.VARCHAR);
+ break;
+ case "CHAR":
+ rsmd.setColumnType(i + 1, Types.CHAR);
+ break;
+ rsmd.setColumnType(i + 1, Types.LONGVARCHAR);
+ break;
+ case "INTEGER":
+ rsmd.setColumnType(i + 1, Types.INTEGER);
+ break;
+ case "BIGINT":
+ rsmd.setColumnType(i + 1, Types.BIGINT);
+ break;
+ case "TINYINT":
+ rsmd.setColumnType(i + 1, Types.TINYINT);
+ break;
+ case "SMALLINT":
+ rsmd.setColumnType(i + 1, Types.SMALLINT);
+ break;
+ case "NUMERIC":
+ rsmd.setColumnType(i + 1, Types.NUMERIC);
+ rsmd.setPrecision(i + 1, 10);
+ break;
+ case "DECIMAL":
+ rsmd.setColumnType(i + 1, Types.DECIMAL);
+ rsmd.setPrecision(i + 1, 10);
+ break;
+ case "DOUBLE":
+ rsmd.setColumnType(i + 1, Types.DOUBLE);
+ rsmd.setPrecision(i + 1, 10);
+ break;
+ case "FLOAT":
+ rsmd.setColumnType(i + 1, Types.FLOAT);
+ rsmd.setPrecision(i + 1, 10);
+ break;
+ case "DATE":
+ rsmd.setColumnType(i + 1, Types.DATE);
+ break;
+ case "TIMESTAMP":
+ rsmd.setColumnType(i + 1, Types.TIMESTAMP);
+ break;
+ case "TIME":
+ rsmd.setColumnType(i + 1, Types.TIME);
+ break;
+ case "BOOLEAN":
+ rsmd.setColumnType(i + 1, Types.BOOLEAN);
+ break;
+ default:
+ rsmd.setColumnType(i + 1, Types.VARCHAR);
+ break;
+ }
- }
- setMetaData(rsmd);
+ setMetaData(rsmd);
- BufferedReader reader = null;
- try {
- reader = Files.newBufferedReader(sourceFile.toPath());
- this.records = csvFormat.parse(reader);
- } catch (IOException e) {
- throw new SQLException(e);
- }
- }
+ BufferedReader reader = null;
+ try {
+ reader = Files.newBufferedReader(sourceFile.toPath());
+ this.records = csvFormat.parse(reader);
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
+ }
- @Override
- public boolean next() throws SQLException {
- /*
- * make sure things look sane. The cursor must be
- * positioned in the rowset or before first (0) or
- * after last (numRows + 1)
- */
+ @Override
+ public boolean next () throws SQLException {
+ /*
+ * make sure things look sane. The cursor must be
+ * positioned in the rowset or before first (0) or
+ * after last (numRows + 1)
+ */
/*if (this.cursorPos < 0 || cursorPos >= numRows + 1) {
throw new SQLException(resBundle.handleGetObject("cachedrowsetimpl.invalidcp").toString());
- // now move and notify
- boolean ret = this.internalNext();
- notifyCursorMoved();
+ // now move and notify
+ boolean ret = this.internalNext();
+ notifyCursorMoved();
- if (!ret) {
- ret = this.records.iterator().hasNext();
- if (ret) {
- readData();
- internalFirst();
+ if (!ret) {
+ ret = this.records.iterator().hasNext();
+ if (ret) {
+ readData();
+ internalFirst();
+ }
- }
- return ret;
- }
+ return ret;
+ }
- private void readData() throws SQLException {
+ private void readData () throws SQLException {
- // Close current cursor and reaopen.
- int currentFetchSize = getFetchSize();
- setFetchSize(0);
- close();
- setFetchSize(currentFetchSize);
- moveToInsertRow();
+ // Close current cursor and reaopen.
+ int currentFetchSize = getFetchSize();
+ setFetchSize(0);
+ close();
+ setFetchSize(currentFetchSize);
+ moveToInsertRow();
- CSVRecord record;
+ CSVRecord record;
- for (int i = 1; i <= getFetchSize(); i++) {
- lineNumer++;
- try {
+ for (int i = 1; i <= getFetchSize(); i++) {
+ lineNumer++;
+ try {
- if (this.records.iterator().hasNext()) {
- record = this.records.iterator().next();
+ if (this.records.iterator().hasNext()) {
+ record = this.records.iterator().next();
- for (int j = 0; j <= this.columnsTypes.length - 1; j++) {
+ for (int j = 0; j <= this.columnsTypes.length - 1; j++) {
- switch (this.columnsTypes[j]) {
- case "VARCHAR":
- case "CHAR":
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateString(j + 1, record.get(j));
- break;
- case "INTEGER":
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateInt(j + 1, Integer.parseInt(record.get(j)));
- break;
- case "TINYINT":
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateByte(j + 1, Byte.parseByte(record.get(j)));
- break;
- case "SMALLINT":
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateShort(j + 1, Short.parseShort(record.get(j)));
- break;
- case "BIGINT":
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateLong(j + 1, Long.parseLong(record.get(j)));
- break;
- case "NUMERIC":
- case "DECIMAL":
- /*
- * "0" [0,0]
- * "0.00" [0,2]
- * "123" [123,0]
- * "-123" [-123,0]
- * "1.23E3" [123,-1]
- * "1.23E+3" [123,-1]
- * "12.3E+7" [123,-6]
- * "12.0" [120,1]
- * "12.3" [123,1]
- * "0.00123" [123,5]
- * "-1.23E-12" [-123,14]
- * "1234.5E-4" [12345,5]
- * "0E+7" [0,-7]
- * "-0" [0,0]
- */
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateBigDecimal(j + 1, new BigDecimal(record.get(j)));
- break;
- case "DOUBLE":
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateDouble(j + 1, Double.parseDouble(record.get(j)));
- break;
- case "FLOAT":
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateFloat(j + 1, Float.parseFloat(record.get(j)));
- break;
- case "DATE":
- // yyyy-[m]m-[d]d
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateDate(j + 1, Date.valueOf(record.get(j)));
- break;
- case "TIMESTAMP":
- // yyyy-[m]m-[d]d hh:mm:ss[.f...]
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateTimestamp(j + 1, Timestamp.valueOf(record.get(j)));
- break;
- case "TIME":
- // hh:mm:ss
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateTime(j + 1, Time.valueOf(record.get(j)));
- break;
- case "BOOLEAN":
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateBoolean(j + 1, convertToBoolean(record.get(j)));
- break;
- default:
- if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
- else updateString(j + 1, record.get(j));
- break;
- }
- }
+ switch (this.columnsTypes[j]) {
+ case "VARCHAR":
+ case "CHAR":
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateString(j + 1, record.get(j));
+ break;
+ case "INTEGER":
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateInt(j + 1, Integer.parseInt(record.get(j)));
+ break;
+ case "TINYINT":
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateByte(j + 1, Byte.parseByte(record.get(j)));
+ break;
+ case "SMALLINT":
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateShort(j + 1, Short.parseShort(record.get(j)));
+ break;
+ case "BIGINT":
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateLong(j + 1, Long.parseLong(record.get(j)));
+ break;
+ case "NUMERIC":
+ case "DECIMAL":
+ /*
+ * "0" [0,0]
+ * "0.00" [0,2]
+ * "123" [123,0]
+ * "-123" [-123,0]
+ * "1.23E3" [123,-1]
+ * "1.23E+3" [123,-1]
+ * "12.3E+7" [123,-6]
+ * "12.0" [120,1]
+ * "12.3" [123,1]
+ * "0.00123" [123,5]
+ * "-1.23E-12" [-123,14]
+ * "1234.5E-4" [12345,5]
+ * "0E+7" [0,-7]
+ * "-0" [0,0]
+ */
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateBigDecimal(j + 1, new BigDecimal(record.get(j)));
+ break;
+ case "DOUBLE":
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateDouble(j + 1, Double.parseDouble(record.get(j)));
+ break;
+ case "FLOAT":
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateFloat(j + 1, Float.parseFloat(record.get(j)));
+ break;
+ case "DATE":
+ // yyyy-[m]m-[d]d
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateDate(j + 1, Date.valueOf(record.get(j)));
+ break;
+ case "TIMESTAMP":
+ // yyyy-[m]m-[d]d hh:mm:ss[.f...]
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateTimestamp(j + 1, Timestamp.valueOf(record.get(j)));
+ break;
+ case "TIME":
+ // hh:mm:ss
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateTime(j + 1, Time.valueOf(record.get(j)));
+ break;
+ case "BOOLEAN":
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateBoolean(j + 1, convertToBoolean(record.get(j)));
+ break;
+ default:
+ if (getStringOrNull(record.get(j)) == null) updateNull(j + 1);
+ else updateString(j + 1, record.get(j));
+ break;
+ }
+ }
- insertRow();
- }
- } catch (Exception e) {
- LOG.error("An error has occurred reading line number " + lineNumer + " of the CSV file", e);
- throw e;
+ insertRow();
+ }
+ } catch (Exception e) {
+ LOG.error("An error has occurred reading line number {} of the CSV file", lineNumer, e);
+ throw e;
+ }
- }
- moveToCurrentRow();
- beforeFirst();
- }
+ moveToCurrentRow();
+ beforeFirst();
+ }
- /**
- * Parses the string argument as a boolean. The boolean returned represents the value true if the
- * string argument is not null and is equal, ignoring case, to the string "true", "yes", "on",
- * "1", "t", "y".
- *
- * @param s the String containing the booleanvalue
- * @return representation to be parsed
- */
- private boolean convertToBoolean(String s) {
- return ("1".equalsIgnoreCase(s)
- || "yes".equalsIgnoreCase(s)
- || "true".equalsIgnoreCase(s)
- || "on".equalsIgnoreCase(s)
- || "y".equalsIgnoreCase(s)
- || "t".equalsIgnoreCase(s));
- }
+ /**
+ * Parses the string argument as a boolean. The boolean returned represents the value true if the
+ * string argument is not null and is equal, ignoring case, to the string "true", "yes", "on",
+ * "1", "t", "y".
+ *
+ * @param s the String containing the booleanvalue
+ * @return representation to be parsed
+ */
+ private boolean convertToBoolean (String s) {
+ return ("1".equalsIgnoreCase(s)
+ || "yes".equalsIgnoreCase(s)
+ || "true".equalsIgnoreCase(s)
+ || "on".equalsIgnoreCase(s)
+ || "y".equalsIgnoreCase(s)
+ || "t".equalsIgnoreCase(s));
+ }
- /**
- * Checks if the value is empty or null and return a null object
- *
- * @param value
- * @return
- */
- private String getStringOrNull(String value) {
- if (value == null || value.isEmpty()) value = null;
- return value;
- }
+ /**
+ * Checks if the value is empty or null and return a null object
+ *
+ * @param value
+ * @return
+ */
+ private String getStringOrNull (String value) {
+ if (value == null || value.isEmpty()) value = null;
+ return value;
+ }
diff --git a/src/test/java/org/replicadb/file/Csv2SqlserverTest.java b/src/test/java/org/replicadb/file/Csv2SqlserverTest.java
new file mode 100644
index 0000000..7584910
--- /dev/null
+++ b/src/test/java/org/replicadb/file/Csv2SqlserverTest.java
@@ -0,0 +1,98 @@
+package org.replicadb.file;
+import org.apache.commons.cli.ParseException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Rule;
+import org.junit.jupiter.api.*;
+import org.replicadb.ReplicaDB;
+import org.replicadb.cli.ToolOptions;
+import org.replicadb.config.ReplicadbSqlserverContainer;
+import org.replicadb.manager.file.FileFormats;
+import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.sql.*;
+import java.util.Properties;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+class Csv2SqlserverTest {
+ private static final Logger LOG = LogManager.getLogger(Csv2SqlserverTest.class);
+ private static final String RESOURCE_DIR = Paths.get("src", "test", "resources").toFile().getAbsolutePath();
+ private static final String REPLICADB_CONF_FILE = "/replicadb.conf";
+ private static final int EXPECTED_ROWS = 1024;
+ private static final String CSV_SOURCE_FILE = "/csv/source.csv";
+ private static final String SINK_COLUMNS = "c_character_var,c_character,c_character_lob,c_integer,c_bigint,c_smallint,c_real,c_numeric,c_decimal,c_double_precision,c_float,c_date,c_timestamp_without_timezone,c_time_without_timezone";
+ private Connection sqlserverConn;
+ @Rule
+ public static MSSQLServerContainer sqlserver = ReplicadbSqlserverContainer.getInstance();
+ @BeforeAll
+ static void setUp () {
+ // Start the container is not necessary
+ }
+ @BeforeEach
+ void before () throws SQLException {
+ this.sqlserverConn = DriverManager.getConnection(sqlserver.getJdbcUrl(), sqlserver.getUsername(), sqlserver.getPassword());
+ }
+ @AfterEach
+ void tearDown () throws SQLException {
+ // Truncate sink table and close connections
+ sqlserverConn.createStatement().execute("TRUNCATE TABLE t_sink");
+ this.sqlserverConn.close();
+ }
+ public int countSinkRows () throws SQLException {
+ Statement stmt = sqlserverConn.createStatement();
+ ResultSet rs = stmt.executeQuery("select count(*) from t_sink");
+ rs.next();
+ int count = rs.getInt(1);
+ LOG.info("Total rows in the sink table: {}", count);
+ return count;
+ }
+ @Test
+ void testSqlserverVersion2017 () throws SQLException {
+ Statement stmt = sqlserverConn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT @@VERSION");
+ rs.next();
+ String version = rs.getString(1);
+ LOG.info(version);
+ assertTrue(version.contains("2017"));
+ }
+ @Test
+ void testCsv2PostgresComplete () throws ParseException, IOException, SQLException {
+ String[] args = {
+ "--source-connect", "file://" + RESOURCE_DIR + CSV_SOURCE_FILE,
+ "--source-file-format", FileFormats.CSV.getType(),
+ "--source-columns", SOURCE_COLUMNS,
+ "--sink-connect", sqlserver.getJdbcUrl(),
+ "--sink-user", sqlserver.getUsername(),
+ "--sink-password", sqlserver.getPassword(),
+ "--sink-columns", SINK_COLUMNS
+ };
+ ToolOptions options = new ToolOptions(args);
+ Properties sourceConnectionParams = new Properties();
+ sourceConnectionParams.setProperty("columns.types", SOURCE_COLUMNS_TYPES);
+ sourceConnectionParams.setProperty("format.firstRecordAsHeader", "true");
+ options.setSourceConnectionParams(sourceConnectionParams);
+ Assertions.assertEquals(0, ReplicaDB.processReplica(options));
+ assertEquals(EXPECTED_ROWS, countSinkRows());
+ }
\ No newline at end of file