Skip to content

Commit

Permalink
Merge branch 'feature/improve-recovery-functionality' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyUss committed Oct 10, 2019
2 parents b07bb30 + 41c8d59 commit 0fb61c6
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 49 deletions.
62 changes: 26 additions & 36 deletions src/ConsistencyEnforcer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,48 +23,38 @@ import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBVendors from './DBVendors';

/**
* Updates consistency state.
*/
async function updateConsistencyState(conversion: Conversion, dataPoolId: number): Promise<void> {
const logTitle: string = 'ConsistencyEnforcer::updateConsistencyState';
const sql: string = `UPDATE "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"
SET is_started = TRUE WHERE id = ${ dataPoolId };`;

const dbAccess: DBAccess = new DBAccess(conversion);
await dbAccess.query(logTitle, sql, DBVendors.PG, false, false);
}

/**
* Retrieves the `is_started` value of current data chunk.
*/
async function dataMayBeLoaded(conversion: Conversion, dataPoolId: number): Promise<boolean> {
const logTitle: string = 'ConsistencyEnforcer::dataMayBeLoaded';
const sql: string = `SELECT is_started AS is_started
FROM "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"
WHERE id = ${ dataPoolId };`;

const dbAccess: DBAccess = new DBAccess(conversion);
const result: DBAccessQueryResult = await dbAccess.query(logTitle, sql, DBVendors.PG, false, false);
return result.error ? false : !!result.data.rows[0].is_started;
}

/**
* Enforces consistency before processing a chunk of data.
* Ensures there are no any data duplications.
* In case of normal execution - it is a good practice.
* In case of rerunning Nmig after unexpected failure - it is absolutely mandatory.
*/
export async function enforceConsistency(conversion: Conversion, chunk: any): Promise<boolean> {
const hasAlreadyBeenLoaded: boolean = await dataMayBeLoaded(conversion, chunk._id);
export async function dataTransferred(conversion: Conversion, dataPoolId: number): Promise<boolean> {
const logTitle: string = 'ConsistencyEnforcer::dataTransferred';
const dataPoolTable: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`;
const sqlGetMetadata: string = `SELECT metadata AS metadata FROM ${ dataPoolTable } WHERE id = ${ dataPoolId };`;
const dbAccess: DBAccess = new DBAccess(conversion);

const result: DBAccessQueryResult = await dbAccess.query(
logTitle,
sqlGetMetadata,
DBVendors.PG,
true,
true
);

const metadata: any = JSON.parse(result.data.rows[0].metadata);
const targetTableName: string = `"${ conversion._schema }"."${ metadata._tableName }"`;
const sqlGetFirstRow: string = `SELECT * FROM ${ targetTableName } LIMIT 1 OFFSET 0;`;

if (hasAlreadyBeenLoaded) {
// Current data chunk runs after a disaster recovery.
// It may already been loaded.
return false;
}
const probe: DBAccessQueryResult = await dbAccess.query(
logTitle,
sqlGetFirstRow,
DBVendors.PG,
true,
false,
result.client
);

// Normal migration flow.
await updateConsistencyState(conversion, chunk._id);
return true;
return probe.data.rows.length !== 0;
}
9 changes: 4 additions & 5 deletions src/DataChunksProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,15 @@ export default async (conversion: Conversion, tableName: string, haveDataChunksP
logTitle,
sqlRowsCnt,
DBVendors.MYSQL,
true,
false,
false
);

const rowsCnt: number = countResult.data[0].rows_count;
const msg: string = `\t--[prepareDataChunks] Total rows to insert into "${ conversion._schema }"."${ tableName }": ${ rowsCnt }`;
log(conversion, msg, conversion._dicTables[tableName].tableLogPath);
const strJson: string = `{"_tableName":"${ tableName }","_selectFieldList":"${ strSelectFieldList }","_rowsCnt":${ rowsCnt }}`;
const sql: string = `INSERT INTO "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`
+ `("is_started", "json") VALUES (FALSE, $1);`;
const metadata: string = `{"_tableName":"${ tableName }","_selectFieldList":"${ strSelectFieldList }","_rowsCnt":${ rowsCnt }}`;
const sql: string = `INSERT INTO "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"("metadata") VALUES ($1);`;

await dbAccess.query(
logTitle,
Expand All @@ -61,6 +60,6 @@ export default async (conversion: Conversion, tableName: string, haveDataChunksP
false,
false,
undefined,
[strJson]
[metadata]
);
}
8 changes: 4 additions & 4 deletions src/DataLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import DBAccessQueryResult from './DBAccessQueryResult';
import DBVendors from './DBVendors';
import MessageToMaster from './MessageToMaster';
import MessageToDataLoader from './MessageToDataLoader';
import { enforceConsistency } from './ConsistencyEnforcer';
import { dataTransferred } from './ConsistencyEnforcer';
import * as extraConfigProcessor from './ExtraConfigProcessor';
import * as path from 'path';
import { PoolClient, QueryResult } from 'pg';
Expand All @@ -36,11 +36,11 @@ const { Transform: Json2CsvTransform } = require('json2csv'); // No declaration
process.on('message', async (signal: MessageToDataLoader) => {
const { config, chunk } = signal;
const conv: Conversion = new Conversion(config);
log(conv, '\t--[loadData] Loading the data...');
log(conv, `\t--[loadData] Loading the data into "${ conv._schema }"."${ chunk._tableName }" table...`);

const isNormalFlow: boolean = await enforceConsistency(conv, chunk);
const isRecoveryMode: boolean = await dataTransferred(conv, chunk._id);

if (isNormalFlow) {
if (!isRecoveryMode) {
await populateTableWorker(conv, chunk._tableName, chunk._selectFieldList, chunk._rowsCnt, chunk._id);
return;
}
Expand Down
7 changes: 3 additions & 4 deletions src/DataPoolManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import Conversion from './Conversion';
export async function createDataPoolTable(conversion: Conversion): Promise<Conversion> {
const dbAccess: DBAccess = new DBAccess(conversion);
const table: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`;
const sql: string = `CREATE TABLE IF NOT EXISTS ${ table }("id" BIGSERIAL, "json" TEXT, "is_started" BOOLEAN);`;
const sql: string = `CREATE TABLE IF NOT EXISTS ${ table }("id" BIGSERIAL, "metadata" TEXT);`;
await dbAccess.query('DataPoolManager::createDataPoolTable', sql, DBVendors.PG, true, false);
log(conversion, `\t--[DataPoolManager.createDataPoolTable] table ${ table } is created...`);
return conversion;
Expand All @@ -53,13 +53,12 @@ export async function dropDataPoolTable(conversion: Conversion): Promise<void> {
export async function readDataPool(conversion: Conversion): Promise<Conversion> {
const dbAccess: DBAccess = new DBAccess(conversion);
const table: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`;
const sql: string = `SELECT id AS id, json AS json FROM ${ table };`;
const sql: string = `SELECT id AS id, metadata AS metadata FROM ${ table };`;
const result: DBAccessQueryResult = await dbAccess.query('DataPoolManager::dropDataPoolTable', sql, DBVendors.PG, true, false);

result.data.rows.forEach((row: any) => {
const obj: any = JSON.parse(row.json);
const obj: any = JSON.parse(row.metadata);
obj._id = row.id;
obj._processed = false;
conversion._dataPool.push(obj);
});

Expand Down

0 comments on commit 0fb61c6

Please sign in to comment.