Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: export events tsv directly to postgres instance #2048

Merged
merged 11 commits into from
Aug 15, 2024
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ jobs:
- name: Setup integration environment
run: |
sudo ufw disable
mkdir -p src/tests-event-replay/.tmp/local/
sudo chown 999:999 src/tests-event-replay/.tmp/local/
sudo chmod -R 777 src/tests-event-replay/.tmp/local/
docker compose -f docker/docker-compose.dev.postgres.yml up -d
npm run devenv:logs -- --no-color &> docker-compose-logs.txt &

Expand Down
1 change: 0 additions & 1 deletion docker/docker-compose.dev.bitcoind.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3"
services:
bitcoind:
image: "blockstack/bitcoind:v0.20.99.0"
Expand Down
5 changes: 3 additions & 2 deletions docker/docker-compose.dev.postgres.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
version: '3.7'
services:
postgres:
image: "postgres:14"
image: "postgres:15"
ports:
- "5490:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: stacks_blockchain_api
POSTGRES_PORT: 5432
volumes:
- ../src/tests-event-replay/.tmp/local/:/root/
1 change: 0 additions & 1 deletion docker/docker-compose.dev.stacks-blockchain.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.7'
services:
stacks-blockchain:
image: 'hirosystems/stacks-api-e2e:stacks3.0-0a2c0e2'
Expand Down
22 changes: 0 additions & 22 deletions src/event-replay/connection-legacy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,3 @@ function getPgClientConfig<TGetPoolConfig extends boolean = false>({
return clientConfig;
}
}

/**
* Creates a postgres pool client connection. If the connection fails due to a transient error, it is retried until successful.
* You'd expect that the pg lib to handle this, but it doesn't, see https://github.com/brianc/node-postgres/issues/1789
*/
export async function connectWithRetry(pool: Pool): Promise<PoolClient> {
for (let retryAttempts = 1; ; retryAttempts++) {
try {
const client = await pool.connect();
return client;
} catch (error: any) {
// Check for transient errors, and retry after 1 second
const pgConnectionError = isPgConnectionError(error);
if (pgConnectionError) {
logger.warn(`${pgConnectionError}, will retry, attempt #${retryAttempts}`);
await timeout(1000);
} else {
throw error;
}
}
}
}
24 changes: 16 additions & 8 deletions src/event-replay/event-replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,24 @@ export async function exportEventsAsTsv(
if (!filePath) {
throw new Error(`A file path should be specified with the --file option`);
}
const resolvedFilePath = path.resolve(filePath);
if (fs.existsSync(resolvedFilePath) && overwriteFile !== true) {
throw new Error(
`A file already exists at ${resolvedFilePath}. Add --overwrite-file to truncate an existing file`
);
const isLocal = filePath.startsWith('local:');
if (isLocal) {
filePath = filePath.replace(/^local:/, '');
if (!path.isAbsolute(filePath)) {
throw new Error(`The file path must be absolute`);
}
} else {
const resolvedFilePath = path.resolve(filePath);
if (fs.existsSync(resolvedFilePath) && overwriteFile !== true) {
throw new Error(
`A file already exists at ${resolvedFilePath}. Add --overwrite-file to truncate an existing file`
);
}
}
console.log(`Export event data to file: ${resolvedFilePath}`);
const writeStream = fs.createWriteStream(resolvedFilePath);

console.log(`Exporting event data to ${filePath}`);
console.log(`Export started...`);
await exportRawEventRequests(writeStream);
await exportRawEventRequests(filePath, isLocal);
console.log('Export successful.');
}

Expand Down
52 changes: 30 additions & 22 deletions src/event-replay/event-requests.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
import { pipelineAsync } from '../helpers';
import { Readable, Writable } from 'stream';
import { pipeline } from 'node:stream/promises';
import { Readable } from 'stream';
import { DbRawEventRequest } from '../datastore/common';
import { PgServer } from '../datastore/connection';
import { connectPgPool, connectWithRetry } from './connection-legacy';
import { getConnectionArgs, getConnectionConfig, PgServer } from '../datastore/connection';
import { connectPgPool } from './connection-legacy';
import * as pgCopyStreams from 'pg-copy-streams';
import * as PgCursor from 'pg-cursor';
import { connectPostgres } from '@hirosystems/api-toolkit';
import { createWriteStream } from 'node:fs';

export async function exportRawEventRequests(targetStream: Writable): Promise<void> {
const pool = await connectPgPool({
usageName: 'export-raw-events',
pgServer: PgServer.primary,
export async function exportRawEventRequests(filePath: string, local: boolean): Promise<void> {
const sql = await connectPostgres({
usageName: `export-events`,
connectionArgs: getConnectionArgs(PgServer.primary),
connectionConfig: getConnectionConfig(PgServer.primary),
});
const client = await connectWithRetry(pool);
try {
const copyQuery = pgCopyStreams.to(
`
COPY (SELECT id, receive_timestamp, event_path, payload FROM event_observer_requests ORDER BY id ASC)
TO STDOUT ENCODING 'UTF8'
`
);
const queryStream = client.query(copyQuery);
await pipelineAsync(queryStream, targetStream);
} finally {
client.release();
await pool.end();
const copyQuery = sql`
COPY (
SELECT id, receive_timestamp, event_path, payload
FROM event_observer_requests
ORDER BY id ASC
)`;
if (local) {
await sql`${copyQuery}
TO '${sql.unsafe(filePath)}'
WITH (FORMAT TEXT, DELIMITER E'\t', ENCODING 'UTF8')
`;
} else {
const readableStream = await sql`${copyQuery}
TO STDOUT
WITH (FORMAT TEXT, DELIMITER E'\t', ENCODING 'UTF8')
`.readable();
await pipeline(readableStream, createWriteStream(filePath));
}
await sql.end();
}

export async function* getRawEventRequests(
Expand Down Expand Up @@ -61,7 +69,7 @@ export async function* getRawEventRequests(
`);
onStatusUpdate?.('Importing raw event requests into temporary table...');
const importStream = client.query(pgCopyStreams.from(`COPY temp_raw_tsv FROM STDIN`));
await pipelineAsync(readStream, importStream);
await pipeline(readStream, importStream);
onStatusUpdate?.('Removing any duplicate raw event requests...');
await client.query(`
INSERT INTO temp_event_observer_requests
Expand Down
45 changes: 37 additions & 8 deletions src/tests-event-replay/import-export-tests.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ChainID } from '@stacks/transactions';
import * as fs from 'fs';
import * as path from 'path';
import { getRawEventRequests } from '../event-replay/event-requests';
import { PgWriteStore } from '../datastore/pg-write-store';
import { exportEventsAsTsv, importEventsFromTsv } from '../event-replay/event-replay';
Expand All @@ -25,7 +26,7 @@ describe('import/export tests', () => {
await db?.close();
});

test('event import and export cycle', async () => {
test('event import and export cycle - remote', async () => {
// Import from mocknet TSV
await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true);
const chainTip = await db.getChainTip(db.sql);
Expand All @@ -38,14 +39,42 @@ describe('import/export tests', () => {
);

// Export into temp TSV
const tmpDir = 'src/tests-event-replay/.tmp';
const tmpDir = 'src/tests-event-replay/.tmp/remote';
fs.mkdirSync(tmpDir, { recursive: true });
await exportEventsAsTsv(`${tmpDir}/export.tsv`);

// Re-import with exported TSV and check that chain tip matches.
try {
fs.mkdirSync(tmpDir);
} catch (error: any) {
if (error.code != 'EEXIST') throw error;
await importEventsFromTsv(`${tmpDir}/export.tsv`, 'archival', true, true);
const newChainTip = await db.getChainTip(db.sql);
expect(newChainTip.block_height).toBe(28);
expect(newChainTip.index_block_hash).toBe(
'0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533'
);
expect(newChainTip.block_hash).toBe(
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
);
} finally {
fs.rmSync(`${tmpDir}/export.tsv`);
}
const tmpTsvPath = `${tmpDir}/export.tsv`;
await exportEventsAsTsv(tmpTsvPath, true);
});

test('event import and export cycle - local', async () => {
// Import from mocknet TSV
await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true);
const chainTip = await db.getChainTip(db.sql);
expect(chainTip.block_height).toBe(28);
expect(chainTip.index_block_hash).toBe(
'0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533'
);
expect(chainTip.block_hash).toBe(
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
);

// Export into temp TSV
const tmpDir = 'src/tests-event-replay/.tmp/local';
fs.mkdirSync(tmpDir, { recursive: true });
await exportEventsAsTsv('local:/root/export.tsv');

// Re-import with exported TSV and check that chain tip matches.
try {
Expand All @@ -59,7 +88,7 @@ describe('import/export tests', () => {
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
);
} finally {
fs.rmSync(tmpDir, { force: true, recursive: true });
fs.rmSync(`${tmpDir}/export.tsv`);
}
});

Expand Down
Loading