From 86d872ea836736461434bb97117c1e483b83b8b4 Mon Sep 17 00:00:00 2001
From: AnatolyUss
Date: Sun, 29 Jan 2017 01:10:01 +0200
Subject: [PATCH] improved disaster recovery mechanism
---
README.md | 2 +-
migration/fmtp/ConsistencyEnforcer.js | 116 +++++++++++++++++++++++--
migration/fmtp/ConstraintsProcessor.js | 12 ++-
migration/fmtp/DataLoader.js | 26 ++----
migration/fmtp/TableProcessor.js | 4 +-
package.json | 2 +-
6 files changed, 130 insertions(+), 32 deletions(-)
diff --git a/README.md b/README.md
index 5e6bbe70..f8e8a7ca 100644
--- a/README.md
+++ b/README.md
@@ -61,7 +61,7 @@ from MySQL to PostgreSQL as easy and smooth as possible.
anatolyuss@gmail.com
VERSION
-Current version is 2.4.0
+
Current version is 2.5.0
(major version . improvements . bug fixes)
diff --git a/migration/fmtp/ConsistencyEnforcer.js b/migration/fmtp/ConsistencyEnforcer.js
index 5cda3bb9..8fb999f1 100644
--- a/migration/fmtp/ConsistencyEnforcer.js
+++ b/migration/fmtp/ConsistencyEnforcer.js
@@ -20,6 +20,9 @@
*/
'use strict';
+const generateError = require('./ErrorGenerator');
+const extraConfigProcessor = require('./ExtraConfigProcessor');
+
/**
* Update consistency state.
*
@@ -53,14 +56,15 @@ const updateConsistencyState = (self, dataPoolId) => {
}
/**
- * Get consistency state.
+ * Get the `is_started` value of current chunk.
*
* @param {Conversion} self
* @param {Number} dataPoolId
*
* @returns {Promise}
*/
-const getConsistencyState = (self, dataPoolId) => {
+
+const getIsStarted = (self, dataPoolId) => {
return new Promise(resolve => {
self._pg.connect((error, client, done) => {
if (error) {
@@ -85,6 +89,63 @@ const getConsistencyState = (self, dataPoolId) => {
});
}
+/**
+ * Current data chunk runs after a disaster recovery.
+ * Must determine if current chunk has already been loaded.
+ * This is in order to prevent possible data duplications.
+ *
+ * @param {Conversion} self
+ * @param {Object} chunk
+ *
+ * @returns {Promise}
+ */
+const hasCurrentChunkLoaded = (self, chunk) => {
+ return new Promise(resolve => {
+ self._pg.connect((pgError, client, done) => {
+ if (pgError) {
+ generateError(self, '\t--[ConsistencyEnforcer::hasCurrentChunkLoaded] Cannot connect to PostgreSQL server...\n' + pgError);
+ resolve(true);
+ } else {
+ const originalTableName = extraConfigProcessor.getTableName(self, chunk._tableName, true);
+ const sql = 'SELECT EXISTS(SELECT 1 FROM "' + self._schema + '"."' + chunk._tableName
+ + '" WHERE "' + self._schema + '_' + originalTableName + '_data_chunk_id_temp" = ' + chunk._id + ');';
+
+ client.query(sql, (err, result) => {
+ done();
+
+ if (err) {
+ generateError(self, '\t--[ConsistencyEnforcer::hasCurrentChunkLoaded] ' + err, sql);
+ resolve(true);
+ } else {
+ resolve(!!result.rows[0].exists);
+ }
+ });
+ }
+ });
+ });
+}
+
+/**
+ * Get consistency state.
+ *
+ * @param {Conversion} self
+ * @param {Object} chunk
+ *
+ * @returns {Promise}
+ */
+const getConsistencyState = (self, chunk) => {
+ return new Promise(resolve => {
+ getIsStarted(self, chunk._id).then(isStarted => {
+ if (isStarted) {
+ hasCurrentChunkLoaded(self, chunk).then(result => resolve(result));
+ } else {
+ // Normal migration flow.
+ resolve(false);
+ }
+ });
+ });
+}
+
/**
* Enforce consistency before processing a chunk of data.
* Ensure there are no any data duplications.
@@ -92,20 +153,59 @@ const getConsistencyState = (self, dataPoolId) => {
* In case of rerunning nmig after unexpected failure - it is absolutely mandatory.
*
* @param {Conversion} self
- * @param {Number} chunkId
+ * @param {Object} chunk
*
* @returns {Promise}
*/
-module.exports = (self, chunkId) => {
+module.exports.enforceConsistency = (self, chunk) => {
return new Promise(resolve => {
- getConsistencyState(self, chunkId).then(isStarted => {
- if (isStarted) {
- // Current data chunk runs after a disaster recovery.
+ getConsistencyState(self, chunk).then(hasAlreadyBeenLoaded => {
+ if (hasAlreadyBeenLoaded) {
+ /*
+ * Current data chunk runs after a disaster recovery.
+ * It has already been loaded.
+ */
resolve(false);
} else {
// Normal migration flow.
- updateConsistencyState(self, chunkId).then(() => resolve(true));
+ updateConsistencyState(self, chunk._id).then(() => resolve(true));
}
})
});
};
+
+/**
+ * Drop the {self._schema + '_' + originalTableName + '_data_chunk_id_temp'} column from current table.
+ *
+ * @param {Conversion} self
+ * @param {String} tableName
+ *
+ * @returns {Promise}
+ */
+module.exports.dropDataChunkIdColumn = (self, tableName) => {
+ return new Promise(resolve => {
+ self._pg.connect((pgError, client, done) => {
+ if (pgError) {
+ generateError(self, '\t--[ConsistencyEnforcer::dropDataChunkIdColumn] Cannot connect to PostgreSQL server...\n' + pgError);
+ resolve();
+ } else {
+ const originalTableName = extraConfigProcessor.getTableName(self, tableName, true);
+ const columnToDrop = self._schema + '_' + originalTableName + '_data_chunk_id_temp';
+ const sql = 'ALTER TABLE "' + self._schema + '"."' + tableName + '" DROP COLUMN "' + columnToDrop + '";';
+
+ client.query(sql, (err, result) => {
+ done();
+
+ if (err) {
+ const errMsg = '\t--[ConsistencyEnforcer::dropDataChunkIdColumn] Failed to drop column "' + columnToDrop + '"\n'
+ + '\t--[ConsistencyEnforcer::dropDataChunkIdColumn] '+ err;
+
+ generateError(self, errMsg, sql);
+ }
+
+ resolve();
+ });
+ }
+ });
+ });
+};
diff --git a/migration/fmtp/ConstraintsProcessor.js b/migration/fmtp/ConstraintsProcessor.js
index 8f4c1d1c..fe45cbf4 100644
--- a/migration/fmtp/ConstraintsProcessor.js
+++ b/migration/fmtp/ConstraintsProcessor.js
@@ -33,6 +33,8 @@ const processIndexAndKey = require('./IndexAndKeyProcessor');
const processComments = require('./CommentsProcessor');
const processForeignKey = require('./ForeignKeyProcessor');
const processViews = require('./ViewGenerator');
+const consistencyEnforcer = require('./ConsistencyEnforcer');
+const dropDataChunkIdColumn = consistencyEnforcer.dropDataChunkIdColumn;
/**
* Continues migration process after data loading, when migrate_only_data is true.
@@ -46,7 +48,11 @@ const continueProcessAfterDataLoadingShort = self => {
for (let i = 0; i < self._tablesToMigrate.length; ++i) {
const tableName = self._tablesToMigrate[i];
- promises.push(sequencesProcessor.setSequenceValue(self, tableName));
+ promises.push(
+ dropDataChunkIdColumn(self, tableName).then(() => {
+ return sequencesProcessor.setSequenceValue(self, tableName);
+ })
+ );
}
Promise.all(promises).then(() => {
@@ -77,7 +83,9 @@ const continueProcessAfterDataLoadingLong = self => {
for (let i = 0; i < self._tablesToMigrate.length; ++i) {
const tableName = self._tablesToMigrate[i];
promises.push(
- processEnum(self, tableName).then(() => {
+ dropDataChunkIdColumn(self, tableName).then(() => {
+ return processEnum(self, tableName);
+ }).then(() => {
return processNull(self, tableName);
}).then(() => {
return processDefault(self, tableName);
diff --git a/migration/fmtp/DataLoader.js b/migration/fmtp/DataLoader.js
index ed7fa331..8f3555b2 100644
--- a/migration/fmtp/DataLoader.js
+++ b/migration/fmtp/DataLoader.js
@@ -29,7 +29,8 @@ const generateError = require('./ErrorGenerator');
const connect = require('./Connector');
const Conversion = require('./Conversion');
const MessageToMaster = require('./MessageToMaster');
-const enforceConsistency = require('./ConsistencyEnforcer');
+const consistencyEnforcer = require('./ConsistencyEnforcer');
+const enforceConsistency = consistencyEnforcer.enforceConsistency;
const extraConfigProcessor = require('./ExtraConfigProcessor');
const copyFrom = pgCopyStreams.from;
const getBuffer = +process.version.split('.')[0].slice(1) < 6
@@ -44,7 +45,7 @@ process.on('message', signal => {
for (let i = 0; i < signal.chunks.length; ++i) {
promises.push(
connect(self).then(() => {
- return enforceConsistency(self, signal.chunks[i]._id);
+ return enforceConsistency(self, signal.chunks[i]);
}).then(isNormalFlow => {
if (isNormalFlow) {
return populateTableWorker(
@@ -58,19 +59,6 @@ process.on('message', signal => {
);
}
- const sql = buildChunkQuery(
- extraConfigProcessor.getTableName(self, signal.chunks[i]._tableName, true),
- signal.chunks[i]._selectFieldList,
- signal.chunks[i]._offset,
- signal.chunks[i]._rowsInChunk
- );
-
- const strTwelveSpaces = ' ';
- const rejectedData = '\n\t--[loadData] Possible data duplication alert!\n\t ' + strTwelveSpaces
- + 'Data, retrievable by following MySQL query:\n' + sql + '\n\t ' + strTwelveSpaces
- + 'may already be migrated.\n\t' + strTwelveSpaces + ' Please, check it.';
-
- log(self, rejectedData, path.join(self._logsDirPath, signal.chunks[i]._tableName + '.log'));
return deleteChunk(self, signal.chunks[i]._id);
})
);
@@ -225,8 +213,9 @@ const populateTableWorker = (self, tableName, strSelectFieldList, offset, rowsIn
generateError(self, '\t--[populateTableWorker] Cannot connect to MySQL server...\n\t' + error);
resolvePopulateTableWorker();
} else {
- const csvAddr = path.join(self._tempDirPath, tableName + offset + '.csv');
- const sql = buildChunkQuery(extraConfigProcessor.getTableName(self, tableName, true), strSelectFieldList, offset, rowsInChunk);
+ const csvAddr = path.join(self._tempDirPath, tableName + offset + '.csv');
+ const originalTableName = extraConfigProcessor.getTableName(self, tableName, true);
+ const sql = buildChunkQuery(originalTableName, strSelectFieldList, offset, rowsInChunk);
connection.query(sql, (err, rows) => {
connection.release();
@@ -235,7 +224,8 @@ const populateTableWorker = (self, tableName, strSelectFieldList, offset, rowsIn
generateError(self, '\t--[populateTableWorker] ' + err, sql);
resolvePopulateTableWorker();
} else {
- rowsInChunk = rows.length;
+ rowsInChunk = rows.length;
+ rows[0][self._schema + '_' + originalTableName + '_data_chunk_id_temp'] = dataPoolId;
csvStringify(rows, (csvError, csvString) => {
rows = null;
diff --git a/migration/fmtp/TableProcessor.js b/migration/fmtp/TableProcessor.js
index 383575ef..f85b43b9 100644
--- a/migration/fmtp/TableProcessor.js
+++ b/migration/fmtp/TableProcessor.js
@@ -126,8 +126,8 @@ module.exports.createTable = (self, tableName) => {
+ '" ' + mapDataTypes(self._dataTypesMap, rows[i].Type) + ',';
}
- rows = null;
- sql = sql.slice(0, -1) + ');';
+ sql += '"' + self._schema + '_' + originalTableName + '_data_chunk_id_temp" BIGINT);';
+
client.query(sql, err => {
done();
diff --git a/package.json b/package.json
index 8b397e4a..6f897e23 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "nmig",
- "version": "2.4.0",
+ "version": "2.5.0",
"description": "The database migration app",
"author": "Anatoly Khaytovich",
"dependencies": {