diff --git a/src/ConsistencyEnforcer.ts b/src/ConsistencyEnforcer.ts index 3f6720a0..afcc017b 100644 --- a/src/ConsistencyEnforcer.ts +++ b/src/ConsistencyEnforcer.ts @@ -23,48 +23,38 @@ import Conversion from './Conversion'; import DBAccess from './DBAccess'; import DBVendors from './DBVendors'; -/** - * Updates consistency state. - */ -async function updateConsistencyState(conversion: Conversion, dataPoolId: number): Promise { - const logTitle: string = 'ConsistencyEnforcer::updateConsistencyState'; - const sql: string = `UPDATE "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }" - SET is_started = TRUE WHERE id = ${ dataPoolId };`; - - const dbAccess: DBAccess = new DBAccess(conversion); - await dbAccess.query(logTitle, sql, DBVendors.PG, false, false); -} - -/** - * Retrieves the `is_started` value of current data chunk. - */ -async function dataMayBeLoaded(conversion: Conversion, dataPoolId: number): Promise { - const logTitle: string = 'ConsistencyEnforcer::dataMayBeLoaded'; - const sql: string = `SELECT is_started AS is_started - FROM "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }" - WHERE id = ${ dataPoolId };`; - - const dbAccess: DBAccess = new DBAccess(conversion); - const result: DBAccessQueryResult = await dbAccess.query(logTitle, sql, DBVendors.PG, false, false); - return result.error ? false : !!result.data.rows[0].is_started; -} - /** * Enforces consistency before processing a chunk of data. * Ensures there are no any data duplications. * In case of normal execution - it is a good practice. * In case of rerunning Nmig after unexpected failure - it is absolutely mandatory. */ -export async function enforceConsistency(conversion: Conversion, chunk: any): Promise { - const hasAlreadyBeenLoaded: boolean = await dataMayBeLoaded(conversion, chunk._id); +export async function dataTransferred(conversion: Conversion, dataPoolId: number): Promise { + const logTitle: string = 'ConsistencyEnforcer::dataTransferred'; + const dataPoolTable: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`; + const sqlGetMetadata: string = `SELECT metadata AS metadata FROM ${ dataPoolTable } WHERE id = ${ dataPoolId };`; + const dbAccess: DBAccess = new DBAccess(conversion); + + const result: DBAccessQueryResult = await dbAccess.query( + logTitle, + sqlGetMetadata, + DBVendors.PG, + true, + true + ); + + const metadata: any = JSON.parse(result.data.rows[0].metadata); + const targetTableName: string = `"${ conversion._schema }"."${ metadata._tableName }"`; + const sqlGetFirstRow: string = `SELECT * FROM ${ targetTableName } LIMIT 1 OFFSET 0;`; - if (hasAlreadyBeenLoaded) { - // Current data chunk runs after a disaster recovery. - // It may already been loaded. - return false; - } + const probe: DBAccessQueryResult = await dbAccess.query( + logTitle, + sqlGetFirstRow, + DBVendors.PG, + true, + false, + result.client + ); - // Normal migration flow. - await updateConsistencyState(conversion, chunk._id); - return true; + return probe.data.rows.length !== 0; } diff --git a/src/DataChunksProcessor.ts b/src/DataChunksProcessor.ts index d729c209..a7350b9a 100644 --- a/src/DataChunksProcessor.ts +++ b/src/DataChunksProcessor.ts @@ -43,16 +43,15 @@ export default async (conversion: Conversion, tableName: string, haveDataChunksP logTitle, sqlRowsCnt, DBVendors.MYSQL, - true, + false, false ); const rowsCnt: number = countResult.data[0].rows_count; const msg: string = `\t--[prepareDataChunks] Total rows to insert into "${ conversion._schema }"."${ tableName }": ${ rowsCnt }`; log(conversion, msg, conversion._dicTables[tableName].tableLogPath); - const strJson: string = `{"_tableName":"${ tableName }","_selectFieldList":"${ strSelectFieldList }","_rowsCnt":${ rowsCnt }}`; - const sql: string = `INSERT INTO "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"` - + `("is_started", "json") VALUES (FALSE, $1);`; + const metadata: string = `{"_tableName":"${ tableName }","_selectFieldList":"${ strSelectFieldList }","_rowsCnt":${ rowsCnt }}`; + const sql: string = `INSERT INTO "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"("metadata") VALUES ($1);`; await dbAccess.query( logTitle, @@ -61,6 +60,6 @@ export default async (conversion: Conversion, tableName: string, haveDataChunksP false, false, undefined, - [strJson] + [metadata] ); } diff --git a/src/DataLoader.ts b/src/DataLoader.ts index 5c5bfcc4..b541e171 100644 --- a/src/DataLoader.ts +++ b/src/DataLoader.ts @@ -25,7 +25,7 @@ import DBAccessQueryResult from './DBAccessQueryResult'; import DBVendors from './DBVendors'; import MessageToMaster from './MessageToMaster'; import MessageToDataLoader from './MessageToDataLoader'; -import { enforceConsistency } from './ConsistencyEnforcer'; +import { dataTransferred } from './ConsistencyEnforcer'; import * as extraConfigProcessor from './ExtraConfigProcessor'; import * as path from 'path'; import { PoolClient, QueryResult } from 'pg'; @@ -36,11 +36,11 @@ const { Transform: Json2CsvTransform } = require('json2csv'); // No declaration process.on('message', async (signal: MessageToDataLoader) => { const { config, chunk } = signal; const conv: Conversion = new Conversion(config); - log(conv, '\t--[loadData] Loading the data...'); + log(conv, `\t--[loadData] Loading the data into "${ conv._schema }"."${ chunk._tableName }" table...`); - const isNormalFlow: boolean = await enforceConsistency(conv, chunk); + const isRecoveryMode: boolean = await dataTransferred(conv, chunk._id); - if (isNormalFlow) { + if (!isRecoveryMode) { await populateTableWorker(conv, chunk._tableName, chunk._selectFieldList, chunk._rowsCnt, chunk._id); return; } diff --git a/src/DataPoolManager.ts b/src/DataPoolManager.ts index da739646..b4ccc1cf 100644 --- a/src/DataPoolManager.ts +++ b/src/DataPoolManager.ts @@ -30,7 +30,7 @@ import Conversion from './Conversion'; export async function createDataPoolTable(conversion: Conversion): Promise { const dbAccess: DBAccess = new DBAccess(conversion); const table: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`; - const sql: string = `CREATE TABLE IF NOT EXISTS ${ table }("id" BIGSERIAL, "json" TEXT, "is_started" BOOLEAN);`; + const sql: string = `CREATE TABLE IF NOT EXISTS ${ table }("id" BIGSERIAL, "metadata" TEXT);`; await dbAccess.query('DataPoolManager::createDataPoolTable', sql, DBVendors.PG, true, false); log(conversion, `\t--[DataPoolManager.createDataPoolTable] table ${ table } is created...`); return conversion; @@ -53,13 +53,12 @@ export async function dropDataPoolTable(conversion: Conversion): Promise { export async function readDataPool(conversion: Conversion): Promise { const dbAccess: DBAccess = new DBAccess(conversion); const table: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`; - const sql: string = `SELECT id AS id, json AS json FROM ${ table };`; + const sql: string = `SELECT id AS id, metadata AS metadata FROM ${ table };`; const result: DBAccessQueryResult = await dbAccess.query('DataPoolManager::dropDataPoolTable', sql, DBVendors.PG, true, false); result.data.rows.forEach((row: any) => { - const obj: any = JSON.parse(row.json); + const obj: any = JSON.parse(row.metadata); obj._id = row.id; - obj._processed = false; conversion._dataPool.push(obj); });