Skip to content

Commit

Permalink
Merge branch 'feature/data-transfer-optimization-4' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyUss committed Feb 1, 2020
2 parents 6c5797d + 4cbe818 commit e33226c
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 14 deletions.
7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,9 @@ from MySQL to PostgreSQL as easy and smooth as possible.</p>
<b>Note:</b> "logs_directory" will be created during script execution.</p>

<h3>VERSION</h3>
<p>Current version is 5.1.0<br />
<p>Current version is 5.2.0<br />
(major version . improvements . bug fixes)</p>

<h3>KNOWN ISSUES</h3>
<ul>
<li>Empty strings in char/varchar columns may be interpreted as NULL.</li>
</ul>

<h3>LICENSE</h3>
<p>NMIG is available under "GNU GENERAL PUBLIC LICENSE" (v. 3) <br />
<a href="http://www.gnu.org/licenses/gpl.txt">http://www.gnu.org/licenses/gpl.txt.</a></p>
6 changes: 6 additions & 0 deletions config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
],
"loader_max_old_space_size" : "DEFAULT",

"streams_high_water_mark_description": [
"Buffer level when stream.write() starts returning false.",
"This number is a number of JavaScript objects."
],
"streams_high_water_mark": 16384,

"encoding_description" : [
"JavaScript encoding type.",
"If not supplied, then utf8 will be used as a default."
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nmig",
"version": "5.1.0",
"version": "5.2.0",
"description": "The database migration app",
"author": "Anatoly Khaytovich<[email protected]>",
"license": "GPL-3.0",
Expand Down
8 changes: 8 additions & 0 deletions src/Conversion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ export default class Conversion {
*/
public _dataTypesMap: any;

/**
* Buffer level when stream.write() starts returning false.
* This number is a number of JavaScript objects.
*/
public readonly _streamsHighWaterMark: number;

/**
* Constructor.
*/
Expand All @@ -215,6 +221,8 @@ export default class Conversion {
this._dataPool = [];
this._dicTables = Object.create(null);
this._mySqlDbName = this._sourceConString.database;
this._streamsHighWaterMark = this._config.streams_high_water_mark === undefined ? 16384 : +this._config.streams_high_water_mark;

this._schema = this._config.schema === undefined || this._config.schema === ''
? this._mySqlDbName
: this._config.schema;
Expand Down
12 changes: 5 additions & 7 deletions src/DataLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,15 @@ async function populateTableWorker(
originalSessionReplicationRole
);

const streamsHighWaterMark: number = 16384; // Commonly used as the default, but may vary across different machines.
const json2csvStream = await getJson2csvStream(conv, originalTableName, streamsHighWaterMark, dataPoolId, client, originalSessionReplicationRole);
const json2csvStream = await getJson2csvStream(conv, originalTableName, dataPoolId, client, originalSessionReplicationRole);
const mysqlClientErrorHandler = async (err: string) => {
await processDataError(conv, err, sql, sqlCopy, tableName, dataPoolId, client, originalSessionReplicationRole);
};

mysqlClient
.query(sql)
.on('error', mysqlClientErrorHandler)
.stream({ highWaterMark: streamsHighWaterMark })
.stream({ highWaterMark: conv._streamsHighWaterMark })
.pipe(json2csvStream)
.pipe(copyStream);
}
Expand Down Expand Up @@ -186,7 +185,6 @@ function getCopyStream(
async function getJson2csvStream(
conversion: Conversion,
originalTableName: string,
streamsHighWaterMark: number,
dataPoolId: number,
client: PoolClient,
originalSessionReplicationRole: string | null
Expand All @@ -208,13 +206,13 @@ async function getJson2csvStream(
fields: tableColumnsResult.data.map((column: any) => column.Field)
};

const transformOptions: any = {
highWaterMark: streamsHighWaterMark,
const streamTransformOptions: any = {
highWaterMark: conversion._streamsHighWaterMark,
objectMode: true,
encoding: conversion._encoding
};

const json2CsvTransformStream = new Json2CsvTransform(options, transformOptions);
const json2CsvTransformStream = new Json2CsvTransform(options, streamTransformOptions);

json2CsvTransformStream.on('error', async (transformError: string) => {
await processDataError(conversion, transformError, '', '', originalTableName, dataPoolId, client, originalSessionReplicationRole);
Expand Down

0 comments on commit e33226c

Please sign in to comment.