From 86c9521fe528b17ec2eeb08d8553645294742694 Mon Sep 17 00:00:00 2001 From: aman035 Date: Thu, 18 Jul 2024 18:06:26 +0530 Subject: [PATCH] add formating & linting --- package.json | 6 +- src/api/index.ts | 24 +- src/api/middlewares/onlyLocalhost.ts | 11 +- src/api/routes/storageRoutes.ts | 273 ++++---- src/app.ts | 6 - src/appInit.ts | 62 +- src/config/index.ts | 26 +- src/helpers/dbHelper.ts | 608 ++++++++++-------- src/loaders/index.ts | 12 +- src/loaders/logger.ts | 22 +- src/services/messaging-common/messageBlock.ts | 64 +- .../messaging-common/queueClientHelper.ts | 5 +- src/services/messaging-common/redisClient.ts | 5 +- .../messaging-common/storageContractState.ts | 164 ++--- .../validatorContractState.ts | 10 +- src/services/messaging-dset/queueClient.ts | 19 +- src/services/messaging-dset/queueServer.ts | 7 +- src/services/messaging/BlockStorage.ts | 186 +++--- src/services/messaging/IndexStorage.ts | 147 +++-- src/services/messaging/queueManager.ts | 46 +- src/services/messaging/storageNode.ts | 157 +++-- src/utilz/bitUtil.ts | 24 +- src/utilz/coll.ts | 42 +- src/utilz/dateUtil.ts | 2 +- src/utilz/envLoader.ts | 14 +- src/utilz/ethSig.ts | 41 +- src/utilz/ethersUtil.ts | 91 +-- src/utilz/expressUtil.ts | 1 + src/utilz/idUtil.ts | 11 +- src/utilz/mySqlUtil.ts | 5 +- src/utilz/numUtil.ts | 2 +- src/utilz/pgUtil.ts | 45 +- src/utilz/promiseUtil.ts | 156 +++-- src/utilz/strUtil.ts | 44 +- src/utilz/tuple.ts | 2 +- src/utilz/waitNotify.ts | 82 +-- src/utilz/winstonUtil.ts | 24 +- tests/DbHelper.test.ts | 39 +- tests/RandomUtil.test.ts | 37 +- tests/SNodeIntegration.test.ts | 444 +++++++------ tests/VNodeIntegration.test.ts | 274 ++++---- tests/root.ts | 10 +- 42 files changed, 1749 insertions(+), 1501 deletions(-) diff --git a/package.json b/package.json index 16e56f5..7eb4f06 100755 --- a/package.json +++ b/package.json @@ -11,9 +11,9 @@ "start": "nodemon", "inspect": "nodemon --inspect src/app.ts", "test": "mocha --inspect=9229 -r ts-node/register tests/**/*.test.ts --require tests/root.ts --serial", - "lint": "eslint src/**/*.ts", - "lint:fix": "eslint --fix src/**/*.ts", - "format": "prettier --write src/**/*.ts", + "lint": "eslint '**/*.{js,ts,jsx,tsx}'", + "lint:fix": "eslint --fix '**/*.{js,ts,jsx,tsx}'", + "format": "prettier --write '**/*.{js,ts,jsx,tsx}'", "prepare": "husky" }, "husky": { diff --git a/src/api/index.ts b/src/api/index.ts index bdb68ff..f0d1881 100755 --- a/src/api/index.ts +++ b/src/api/index.ts @@ -1,12 +1,12 @@ -import { Router } from 'express'; - -import {storageRoutes} from './routes/storageRoutes'; -import {ExpressUtil} from "../utilz/expressUtil"; - -// guaranteed to get dependencies -export default () => { - const app = Router(); - app.use(ExpressUtil.handle); - storageRoutes(app); - return app; -}; +import { Router } from 'express' + +import { ExpressUtil } from '../utilz/expressUtil' +import { storageRoutes } from './routes/storageRoutes' + +// guaranteed to get dependencies +export default () => { + const app = Router() + app.use(ExpressUtil.handle) + storageRoutes(app) + return app +} diff --git a/src/api/middlewares/onlyLocalhost.ts b/src/api/middlewares/onlyLocalhost.ts index 3c4f03a..bd008bd 100644 --- a/src/api/middlewares/onlyLocalhost.ts +++ b/src/api/middlewares/onlyLocalhost.ts @@ -1,9 +1,10 @@ import { Container } from 'typedi' + import config from '../../config' -var dns = require('dns') -var os = require('os') -var ifaces = os.networkInterfaces() +const dns = require('dns') +const os = require('os') +const ifaces = os.networkInterfaces() /** * @param {*} req Express req Object @@ -14,8 +15,8 @@ const onlyLocalhost = async (req, res, next) => { const Logger = Container.get('logger') try { // Check if ip is localhost and only continue - var ip = req.connection.remoteAddress - var host = req.get('host') + const ip = req.connection.remoteAddress + const host = req.get('host') if (config.environment === 'production') { // Return with unauthorized error diff --git a/src/api/routes/storageRoutes.ts b/src/api/routes/storageRoutes.ts index 5d31dae..b24142a 100755 --- a/src/api/routes/storageRoutes.ts +++ b/src/api/routes/storageRoutes.ts @@ -1,149 +1,165 @@ -import {Router, Request, Response, NextFunction} from 'express'; -import {Container} from 'typedi'; -import log from '../../loaders/logger'; -import DbHelper from '../../helpers/dbHelper'; -import bodyParser from "body-parser"; -import {DateTime} from "ts-luxon"; -import {ExpressUtil} from "../../utilz/expressUtil"; -import DateUtil from "../../utilz/dateUtil"; -import StrUtil from "../../utilz/strUtil"; -import {ValidatorContractState} from "../../services/messaging-common/validatorContractState"; -import onlyLocalhost from "../middlewares/onlyLocalhost"; -import StorageNode from "../../services/messaging/storageNode"; -import {Coll} from "../../utilz/coll"; -import {MessageBlockUtil} from "../../services/messaging-common/messageBlock"; -import {StorageContractState} from "../../services/messaging-common/storageContractState"; -import {EnvLoader} from "../../utilz/envLoader"; - -const PAGE_SIZE = Number.parseInt(EnvLoader.getPropertyOrDefault("PAGE_SIZE", "30")); - -const route = Router(); -const dbh = new DbHelper(); - +import bodyParser from 'body-parser' +import { NextFunction, Request, Response, Router } from 'express' +import { DateTime } from 'ts-luxon' +import { Container } from 'typedi' + +import DbHelper from '../../helpers/dbHelper' +import log from '../../loaders/logger' +import StorageNode from '../../services/messaging/storageNode' +import { MessageBlockUtil } from '../../services/messaging-common/messageBlock' +import { StorageContractState } from '../../services/messaging-common/storageContractState' +import { ValidatorContractState } from '../../services/messaging-common/validatorContractState' +import { Coll } from '../../utilz/coll' +import DateUtil from '../../utilz/dateUtil' +import { EnvLoader } from '../../utilz/envLoader' +import StrUtil from '../../utilz/strUtil' +import onlyLocalhost from '../middlewares/onlyLocalhost' + +const PAGE_SIZE = Number.parseInt(EnvLoader.getPropertyOrDefault('PAGE_SIZE', '30')) + +const route = Router() +const dbh = new DbHelper() function logRequest(req: Request) { - log.debug('Calling %o %o with body: %o', req.method, req.url, req.body); + log.debug('Calling %o %o with body: %o', req.method, req.url, req.body) } // todo ValidatorContractState export function storageRoutes(app: Router) { - app.use(bodyParser.json()); + app.use(bodyParser.json()) app.post( '/v1/reshard', onlyLocalhost, async (req: Request, res: Response, next: NextFunction) => { - logRequest(req); - const storageNode = Container.get(StorageNode); + logRequest(req) + const storageNode = Container.get(StorageNode) // ex: { "arr": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] } - let arr:number[] = req.body.arr; - let set = Coll.arrayToSet(arr); + const arr: number[] = req.body.arr + const set = Coll.arrayToSet(arr) await storageNode.handleReshard(set) - return res.status(200); - }); - - app.use('/v1/kv', route); + return res.status(200) + } + ) + app.use('/v1/kv', route) // todo move to StorageNode route.get( '/ns/:nsName/nsidx/:nsIndex/date/:dt/key/:key', async (req: Request, res: Response, next: NextFunction) => { - logRequest(req); - const nsName = req.params.nsName; - const nsIndex = req.params.nsIndex; - const dt = req.params.dt; - const key = req.params.key; - - const valContractState = Container.get(ValidatorContractState); - const storageContractState = Container.get(StorageContractState); - const nodeId = valContractState.nodeId; // todo read this from db - log.debug(`nsName=${nsName} nsIndex=${nsIndex} dt=${dt} key=${key} nodeId=${nodeId}`); - let shardId = MessageBlockUtil.calculateAffectedShard(nsIndex, storageContractState.shardCount); - - const date = DateTime.fromISO(dt, {zone: 'utc'}); + logRequest(req) + const nsName = req.params.nsName + const nsIndex = req.params.nsIndex + const dt = req.params.dt + const key = req.params.key + + const valContractState = Container.get(ValidatorContractState) + const storageContractState = Container.get(StorageContractState) + const nodeId = valContractState.nodeId // todo read this from db + log.debug(`nsName=${nsName} nsIndex=${nsIndex} dt=${dt} key=${key} nodeId=${nodeId}`) + const shardId = MessageBlockUtil.calculateAffectedShard( + nsIndex, + storageContractState.shardCount + ) + + const date = DateTime.fromISO(dt, { zone: 'utc' }) if (!date.isValid) { - return res.status(400).json('Invalid date ' + dt); + return res.status(400).json('Invalid date ' + dt) } log.debug(`parsed date ${dt} -> ${date}`) - const storageTable = await DbHelper.findStorageTableByDate(nsName, shardId, date); + const storageTable = await DbHelper.findStorageTableByDate(nsName, shardId, date) log.debug(`found table ${storageTable}`) if (StrUtil.isEmpty(storageTable)) { - log.error('storage table not found'); - return res.status(401).json('storage table not found'); + log.error('storage table not found') + return res.status(401).json('storage table not found') } - const storageItems = await DbHelper.findStorageItem(nsName, nsIndex, storageTable, key); + const storageItems = await DbHelper.findStorageItem(nsName, nsIndex, storageTable, key) log.debug(`found value: ${storageItems}`) try { return res.status(200).json({ items: storageItems - }); + }) } catch (e) { - return next(e); + return next(e) } } - ); - - + ) // todo move to StorageNode // todo not tested with new sharing (we don't use it anymore) route.post( - '/ns/:nsName/nsidx/:nsIndex/ts/:ts/key/:key', /* */ + '/ns/:nsName/nsidx/:nsIndex/ts/:ts/key/:key' /* */, async (req: Request, res: Response, next: NextFunction) => { - logRequest(req); - const nsName = req.params.nsName; // ex: feeds - const nsIndex = req.params.nsIndex; // ex: 1000000 - const ts: string = req.params.ts; //ex: 1661214142.123456 - const key = req.params.key; // ex: 5b62a7b2-d6eb-49ef-b080-20a7fa3091ad - const valContractState = Container.get(ValidatorContractState); - const storageContractState = Container.get(StorageContractState); - - const nodeId = valContractState.nodeId; - const body = JSON.stringify(req.body); - log.debug(`nsName=${nsName} nsIndex=${nsIndex} ts=${ts} key=${key} nodeId=${nodeId} body=${body}`); - let shardId = MessageBlockUtil.calculateAffectedShard(nsIndex, storageContractState.shardCount); - log.debug(`nodeId=${nodeId} shardId=${shardId}`); - const success = await DbHelper.checkThatShardIsOnThisNode(nsName, shardId, nodeId); + logRequest(req) + const nsName = req.params.nsName // ex: feeds + const nsIndex = req.params.nsIndex // ex: 1000000 + const ts: string = req.params.ts //ex: 1661214142.123456 + const key = req.params.key // ex: 5b62a7b2-d6eb-49ef-b080-20a7fa3091ad + const valContractState = Container.get(ValidatorContractState) + const storageContractState = Container.get(StorageContractState) + + const nodeId = valContractState.nodeId + const body = JSON.stringify(req.body) + log.debug( + `nsName=${nsName} nsIndex=${nsIndex} ts=${ts} key=${key} nodeId=${nodeId} body=${body}` + ) + const shardId = MessageBlockUtil.calculateAffectedShard( + nsIndex, + storageContractState.shardCount + ) + log.debug(`nodeId=${nodeId} shardId=${shardId}`) + const success = await DbHelper.checkThatShardIsOnThisNode(nsName, shardId, nodeId) if (!success) { - let errMsg = `${nsName}.${nsIndex} maps to shard ${shardId} which is missing on node ${nodeId}`; - console.log(errMsg); - return res.status(500) - .json({errorMessage: errMsg}) + const errMsg = `${nsName}.${nsIndex} maps to shard ${shardId} which is missing on node ${nodeId}` + console.log(errMsg) + return res.status(500).json({ errorMessage: errMsg }) } - const date = DateUtil.parseUnixFloatAsDateTime(ts); + const date = DateUtil.parseUnixFloatAsDateTime(ts) log.debug(`parsed date ${ts} -> ${date}`) - var storageTable = await DbHelper.findStorageTableByDate(nsName, shardId, date); + const storageTable = await DbHelper.findStorageTableByDate(nsName, shardId, date) log.debug(`found table ${storageTable}`) if (StrUtil.isEmpty(storageTable)) { - log.error('storage table not found'); - var monthStart = date.startOf('month').toISODate().toString(); - var monthEndExclusive = date.startOf('month').plus({months: 1}).toISODate().toString(); - log.debug('creating new storage table'); - const dateYYYYMM = DateUtil.formatYYYYMM(date); - const tableName = `storage_ns_${nsName}_d_${dateYYYYMM}`; - const recordCreated = await DbHelper.createNewNodestorageRecord(nsName, shardId, - monthStart, monthEndExclusive, tableName); + log.error('storage table not found') + const monthStart = date.startOf('month').toISODate().toString() + const monthEndExclusive = date.startOf('month').plus({ months: 1 }).toISODate().toString() + log.debug('creating new storage table') + const dateYYYYMM = DateUtil.formatYYYYMM(date) + const tableName = `storage_ns_${nsName}_d_${dateYYYYMM}` + const recordCreated = await DbHelper.createNewNodestorageRecord( + nsName, + shardId, + monthStart, + monthEndExclusive, + tableName + ) if (recordCreated) { log.debug('record created: ', recordCreated) // we've added a new record to node_storage_layout => we can safely try to create a table // otherwise, if many connections attempt to create a table from multiple threads // it leads to postgres deadlock sometimes - await DbHelper.createNewStorageTable(tableName); + await DbHelper.createNewStorageTable(tableName) log.debug('creating node storage layout mapping') } } - var storageTable = await DbHelper.findStorageTableByDate(nsName, shardId, date); - const storageValue = await DbHelper.putValueInTable(nsName, shardId, nsIndex, storageTable, ts, key, body); + const storageValue = await DbHelper.putValueInTable( + nsName, + shardId, + nsIndex, + storageTable, + ts, + key, + body + ) log.debug(`found value: ${storageValue}`) - log.debug('success is ' + success); + log.debug('success is ' + success) try { - return res.status(201).json(storageValue); + return res.status(201).json(storageValue) } catch (e) { - return next(e); + return next(e) } } - ); + ) /* Search for namespace:namespaceIndex data , ordered by timestamp asc, paginated by timestamp (!) Pagination is achieved by using firstTs parameter, passed from the previous invocation @@ -151,54 +167,63 @@ export function storageRoutes(app: Router) { * */ // todo move to StorageNode route.post( - '/ns/:nsName/nsidx/:nsIndex/month/:month/list/', /* */ + '/ns/:nsName/nsidx/:nsIndex/month/:month/list/' /* */, async (req: Request, res: Response, next: NextFunction) => { - logRequest(req); + logRequest(req) // we will search for data starting from this key exclusive // use lastTs from the previous request to get more data - const firstTs: string = req.query.firstTs; - const nsName = req.params.nsName; - const nsIndex = req.params.nsIndex; - const dt = req.params.month + '01'; - - const valContractState = Container.get(ValidatorContractState); - const storageContractState = Container.get(StorageContractState); - const nodeId = valContractState.nodeId; // todo read this from db - log.debug(`nsName=${nsName} nsIndex=${nsIndex} dt=${dt} nodeId=${nodeId} PAGE_SIZE=${PAGE_SIZE}`); - let shardId = MessageBlockUtil.calculateAffectedShard(nsIndex, storageContractState.shardCount); - const date = DateTime.fromISO(dt, {zone: 'utc'}); + const firstTs: string = req.query.firstTs + const nsName = req.params.nsName + const nsIndex = req.params.nsIndex + const dt = req.params.month + '01' + + const valContractState = Container.get(ValidatorContractState) + const storageContractState = Container.get(StorageContractState) + const nodeId = valContractState.nodeId // todo read this from db + log.debug( + `nsName=${nsName} nsIndex=${nsIndex} dt=${dt} nodeId=${nodeId} PAGE_SIZE=${PAGE_SIZE}` + ) + const shardId = MessageBlockUtil.calculateAffectedShard( + nsIndex, + storageContractState.shardCount + ) + const date = DateTime.fromISO(dt, { zone: 'utc' }) if (!date.isValid) { - return res.status(400).json('Invalid date ' + dt); + return res.status(400).json('Invalid date ' + dt) } log.debug(`parsed date ${dt} -> ${date}`) - const storageTable = await DbHelper.findStorageTableByDate(nsName, shardId, date); + const storageTable = await DbHelper.findStorageTableByDate(nsName, shardId, date) log.debug(`found table ${storageTable}`) if (StrUtil.isEmpty(storageTable)) { - log.error('storage table not found'); - return res.status(401).json('storage table not found'); + log.error('storage table not found') + return res.status(401).json('storage table not found') } - const storageValue = await DbHelper.listInbox(nsName, shardId, nsIndex, storageTable, firstTs, PAGE_SIZE); + const storageValue = await DbHelper.listInbox( + nsName, + shardId, + nsIndex, + storageTable, + firstTs, + PAGE_SIZE + ) log.debug(`found value: ${storageValue}`) try { - return res.status(200).json(storageValue); + return res.status(200).json(storageValue) } catch (e) { - return next(e); + return next(e) } } - ); + ) // prints all namespaces - route.post( - '/ns/all/', /* */ - async (req: Request, res: Response, next: NextFunction) => { - logRequest(req); - const allNsIndex = await DbHelper.listAllNsIndex(); - try { - return res.status(200).json(allNsIndex); - } catch (e) { - return next(e); - } + route.post('/ns/all/' /* */, async (req: Request, res: Response, next: NextFunction) => { + logRequest(req) + const allNsIndex = await DbHelper.listAllNsIndex() + try { + return res.status(200).json(allNsIndex) + } catch (e) { + return next(e) } - ); -}; + }) +} // todo remove logic from router diff --git a/src/app.ts b/src/app.ts index aae0627..0d9dc08 100755 --- a/src/app.ts +++ b/src/app.ts @@ -1,9 +1,3 @@ - - - - - - import { startServer } from './appInit' // Call server from here to ensure test cases run fine diff --git a/src/appInit.ts b/src/appInit.ts index 09a5a34..7319e01 100755 --- a/src/appInit.ts +++ b/src/appInit.ts @@ -1,39 +1,41 @@ -import {EnvLoader} from "./utilz/envLoader"; -EnvLoader.loadEnvOrFail(); +import { EnvLoader } from './utilz/envLoader' +EnvLoader.loadEnvOrFail() -import 'reflect-metadata'; // We need this in order to use @Decorators -import express from 'express'; -import chalk from 'chalk'; -import {Container} from "typedi"; -import StorageNode from "./services/messaging/storageNode"; -import {ValidatorContractState} from "./services/messaging-common/validatorContractState"; -import {MySqlUtil} from "./utilz/mySqlUtil"; +import 'reflect-metadata' // We need this in order to use @Decorators +import chalk from 'chalk' +import express from 'express' +import { Container } from 'typedi' + +import StorageNode from './services/messaging/storageNode' async function startServer(logLevel = null) { if (logLevel) { - const changeLogLevel = (await require('./config/index')).changeLogLevel; - changeLogLevel(logLevel); + const changeLogLevel = (await require('./config/index')).changeLogLevel + changeLogLevel(logLevel) } // Continue Loading normally - const config = (await require('./config/index')).default; - logLevel = logLevel || config.logs.level; + const config = (await require('./config/index')).default + logLevel = logLevel || config.logs.level // ONLY TIME CONSOLE IS USED - console.log(chalk.bold.inverse('RUNNING WITH LOG LEVEL '), chalk.bold.blue.inverse(` ${logLevel} `)); + console.log( + chalk.bold.inverse('RUNNING WITH LOG LEVEL '), + chalk.bold.blue.inverse(` ${logLevel} `) + ) // Load logger - const Logger = (await require('./loaders/logger')).default; + const Logger = (await require('./loaders/logger')).default - await require('./api/index'); - await require('./loaders/express'); - Container.set("logger", Logger); + await require('./api/index') + await require('./loaders/express') + Container.set('logger', Logger) - await Container.get(StorageNode).postConstruct(); + await Container.get(StorageNode).postConstruct() // load app - const app = express(); - const server = require("http").createServer(app); + const app = express() + const server = require('http').createServer(app) /** * A little hack here @@ -41,26 +43,26 @@ async function startServer(logLevel = null) { * Well, at least in node 10 without babel and at the time of writing * So we are using good old require. **/ - await require('./loaders').default({ expressApp: app, server: server }); + await require('./loaders').default({ expressApp: app, server: server }) - server.listen(config.port, err => { + server.listen(config.port, (err) => { if (err) { - Logger.error(err); - process.exit(1); - return; + Logger.error(err) + process.exit(1) + return } Logger.info(` ################################################ STARTED 🛡️ Server listening on port: ${config.port} 🛡️ ################################################ - `); - }); + `) + }) } // stopServer shuts down the server. Used in tests. async function stopServer() { - process.exit(0); + process.exit(0) } -export { startServer, stopServer }; +export { startServer, stopServer } diff --git a/src/config/index.ts b/src/config/index.ts index 8e249af..8ac0858 100755 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -1,24 +1,22 @@ // import {logLevel} from '../app' // Set the NODE_ENV to 'development' by default -process.env.NODE_ENV = process.env.NODE_ENV || 'development'; - +process.env.NODE_ENV = process.env.NODE_ENV || 'development' export const changeLogLevel = (level: string) => { if (level) { } -}; +} // console.log("-------------custom------", logLevel) export default { - environment: process.env.NODE_ENV, - port: parseInt((process.env.PORT || '3000'), 10), + port: parseInt(process.env.PORT || '3000', 10), runningOnMachine: process.env.RUNNING_ON_MACHINE, logs: { - level: process.env.LOG_LEVEL || 'silly', + level: process.env.LOG_LEVEL || 'silly' }, dbhost: process.env.DB_HOST, @@ -30,9 +28,15 @@ export default { /** * File system config */ - fsServerURL: process.env.NODE_ENV == 'development' ? process.env.FS_SERVER_DEV : process.env.FS_SERVER_PROD, + fsServerURL: + process.env.NODE_ENV == 'development' ? process.env.FS_SERVER_DEV : process.env.FS_SERVER_PROD, staticServePath: process.env.SERVE_STATIC_FILES, - staticCachePath: __dirname + '/../../' + process.env.SERVE_STATIC_FILES + '/' + process.env.SERVE_CACHE_FILES + '/', - staticAppPath: __dirname + '/../../', - -}; + staticCachePath: + __dirname + + '/../../' + + process.env.SERVE_STATIC_FILES + + '/' + + process.env.SERVE_CACHE_FILES + + '/', + staticAppPath: __dirname + '/../../' +} diff --git a/src/helpers/dbHelper.ts b/src/helpers/dbHelper.ts index 4621112..fece671 100755 --- a/src/helpers/dbHelper.ts +++ b/src/helpers/dbHelper.ts @@ -1,27 +1,26 @@ -import log from '../loaders/logger'; -import pgPromise from 'pg-promise'; - -import {DateTime} from "ts-luxon"; -import StrUtil from "../utilz/strUtil"; -import {EnvLoader} from "../utilz/envLoader"; -import {MySqlUtil} from "../utilz/mySqlUtil"; -import {PgUtil} from "../utilz/pgUtil"; -import {IClient} from "pg-promise/typescript/pg-subset"; - // mysql -import crypto from "crypto"; -import {WinstonUtil} from "../utilz/winstonUtil"; - -var mysql = require('mysql') -var mysqlPool = mysql.createPool({ - connectionLimit: 10, - host: EnvLoader.getPropertyOrFail('DB_HOST'), - user: EnvLoader.getPropertyOrFail('DB_USER'), - password: EnvLoader.getPropertyOrFail('DB_PASS'), - database: EnvLoader.getPropertyOrFail('DB_NAME'), - port: Number(EnvLoader.getPropertyOrFail('DB_NAME')) +import crypto from 'crypto' +import pgPromise from 'pg-promise' +import { IClient } from 'pg-promise/typescript/pg-subset' +import { DateTime } from 'ts-luxon' + +import log from '../loaders/logger' +import { EnvLoader } from '../utilz/envLoader' +import { MySqlUtil } from '../utilz/mySqlUtil' +import { PgUtil } from '../utilz/pgUtil' +import StrUtil from '../utilz/strUtil' +import { WinstonUtil } from '../utilz/winstonUtil' + +const mysql = require('mysql') +const mysqlPool = mysql.createPool({ + connectionLimit: 10, + host: EnvLoader.getPropertyOrFail('DB_HOST'), + user: EnvLoader.getPropertyOrFail('DB_USER'), + password: EnvLoader.getPropertyOrFail('DB_PASS'), + database: EnvLoader.getPropertyOrFail('DB_NAME'), + port: Number(EnvLoader.getPropertyOrFail('DB_NAME')) }) -MySqlUtil.init(mysqlPool); +MySqlUtil.init(mysqlPool) // postgres // todo fix variable substitution, see #putValueInTable() @@ -29,23 +28,24 @@ MySqlUtil.init(mysqlPool); // todo use PgUtil // todo use placeholders (?) -let logger = WinstonUtil.newLog('pg'); -let options = { - query: function (e) { - logger.debug('', e.query); - if (e.params) { - logger.debug('PARAMS: ', e.params); - } +const logger = WinstonUtil.newLog('pg') +const options = { + query: function (e) { + logger.debug('', e.query) + if (e.params) { + logger.debug('PARAMS: ', e.params) } -}; -const pg: pgPromise.IMain<{}, IClient> = pgPromise(options); -export const pgPool = pg(`postgres://${EnvLoader.getPropertyOrFail('PG_USER')}:${EnvLoader.getPropertyOrFail('PG_PASS')}@${EnvLoader.getPropertyOrFail('PG_HOST')}:5432/${EnvLoader.getPropertyOrFail('PG_NAME')}`); -PgUtil.init(pgPool); + } +} +const pg: pgPromise.IMain<{}, IClient> = pgPromise(options) +export const pgPool = pg( + `postgres://${EnvLoader.getPropertyOrFail('PG_USER')}:${EnvLoader.getPropertyOrFail('PG_PASS')}@${EnvLoader.getPropertyOrFail('PG_HOST')}:5432/${EnvLoader.getPropertyOrFail('PG_NAME')}` +) +PgUtil.init(pgPool) export default class DbHelper { - - public static async createStorageTablesIfNeeded() { - await PgUtil.update(` + public static async createStorageTablesIfNeeded() { + await PgUtil.update(` CREATE TABLE IF NOT EXISTS node_storage_layout ( namespace VARCHAR(20) NOT NULL, @@ -55,8 +55,8 @@ export default class DbHelper { table_name VARCHAR(64) NOT NULL, PRIMARY KEY (namespace, namespace_shard_id, ts_start, ts_end) ); - `); - await PgUtil.update(` + `) + await PgUtil.update(` -- allows itself to be called on every call -- recreates a view, no more than once per day, which contains @@ -114,58 +114,67 @@ BEGIN END $$ LANGUAGE plpgsql; `) - } - - // maps key -> 8bit space (0..255) - // uses first 4bit from an md5 hash - public static calculateShardForNamespaceIndex(namespace: string, key: string): number { - let buf = crypto.createHash('md5').update(key).digest(); - let i32 = buf.readUInt32LE(0); - const i8bits = i32 % 32; // todo it's 0x20 now, not 0xff - log.debug('calculateShardForNamespaceIndex(): ', buf, '->', i32, '->', i8bits); - return i8bits; - } - - public static async checkIfStorageTableExists(): Promise { - const date = new Date(); - const dbdate = date.getFullYear().toString() + date.getMonth().toString(); - var sql = ` + } + + // maps key -> 8bit space (0..255) + // uses first 4bit from an md5 hash + public static calculateShardForNamespaceIndex(namespace: string, key: string): number { + const buf = crypto.createHash('md5').update(key).digest() + const i32 = buf.readUInt32LE(0) + const i8bits = i32 % 32 // todo it's 0x20 now, not 0xff + log.debug('calculateShardForNamespaceIndex(): ', buf, '->', i32, '->', i8bits) + return i8bits + } + + public static async checkIfStorageTableExists(): Promise { + const date = new Date() + const dbdate = date.getFullYear().toString() + date.getMonth().toString() + const sql = ` SELECT EXISTS( SELECT FROM pg_tables WHERE schemaname='public' AND tablename='storage_ns_inbox_d_${dbdate}') ` - console.log(sql) - return pgPool.query(sql).then(data => { - console.log(data) - return Promise.resolve(true) - }).catch(err => { - console.log(err); - return Promise.resolve(false); - }); - } - - - public static async createNewNodestorageRecord(namespace: string, namespaceShardId: number, ts_start: any, ts_end: any, table_name: string): Promise { - const sql = ` + console.log(sql) + return pgPool + .query(sql) + .then((data) => { + console.log(data) + return Promise.resolve(true) + }) + .catch((err) => { + console.log(err) + return Promise.resolve(false) + }) + } + + public static async createNewNodestorageRecord( + namespace: string, + namespaceShardId: number, + ts_start: any, + ts_end: any, + table_name: string + ): Promise { + const sql = ` insert into node_storage_layout (namespace, namespace_shard_id, ts_start, ts_end, table_name) values ($1, $2, $3, $4, $5) on conflict do nothing; ` - console.log(sql); - return pgPool.result(sql, [namespace, namespaceShardId, ts_start, ts_end, table_name], r => r.rowCount) - .then(rowCount => { - console.log('inserted rowcount: ', rowCount) - return Promise.resolve(rowCount == 1) - }).catch(err => { - console.log(err); - return Promise.resolve(false); - }); - } - - public static async createNewStorageTable(tableName: string): Promise { - - // primary key should prevent duplicates by skey per inbox - await PgUtil.update(` + console.log(sql) + return pgPool + .result(sql, [namespace, namespaceShardId, ts_start, ts_end, table_name], (r) => r.rowCount) + .then((rowCount) => { + console.log('inserted rowcount: ', rowCount) + return Promise.resolve(rowCount == 1) + }) + .catch((err) => { + console.log(err) + return Promise.resolve(false) + }) + } + + public static async createNewStorageTable(tableName: string): Promise { + // primary key should prevent duplicates by skey per inbox + await PgUtil.update(` CREATE TABLE IF NOT EXISTS ${tableName} ( namespace VARCHAR(20) NOT NULL, @@ -176,156 +185,197 @@ END $$ LANGUAGE plpgsql; dataSchema VARCHAR(20) NOT NULL, payload JSONB, PRIMARY KEY(namespace,namespace_shard_id,namespace_id,skey) - );`); + );`) - await PgUtil.update(`CREATE INDEX IF NOT EXISTS + await PgUtil.update(`CREATE INDEX IF NOT EXISTS ${tableName}_idx ON ${tableName} - USING btree (namespace ASC, namespace_id ASC, ts ASC);`); - } - - // todo fix params substitution for the pg library; - public static async checkThatShardIsOnThisNode(namespace: string, namespaceShardId: number, nodeId: string): Promise { - const sql = `SELECT count(*) FROM network_storage_layout + USING btree (namespace ASC, namespace_id ASC, ts ASC);`) + } + + // todo fix params substitution for the pg library; + public static async checkThatShardIsOnThisNode( + namespace: string, + namespaceShardId: number, + nodeId: string + ): Promise { + const sql = `SELECT count(*) FROM network_storage_layout where namespace='${namespace}' and namespace_shard_id='${namespaceShardId}' and node_id='${nodeId}'` - console.log(sql); - return pgPool.query(sql).then(data => { - console.log(data) - let cnt = parseInt(data[0].count); - console.log(cnt); - return cnt === 1 - }).catch(err => { - console.log(err); - return Promise.resolve(false); - }); - } - - public static async findStorageTableByDate(namespace: string, namespaceShardId: number, dateYmd: DateTime): Promise { - log.debug(`date is ${dateYmd.toISO()}`); - const sql = `select table_name from node_storage_layout + console.log(sql) + return pgPool + .query(sql) + .then((data) => { + console.log(data) + const cnt = parseInt(data[0].count) + console.log(cnt) + return cnt === 1 + }) + .catch((err) => { + console.log(err) + return Promise.resolve(false) + }) + } + + public static async findStorageTableByDate( + namespace: string, + namespaceShardId: number, + dateYmd: DateTime + ): Promise { + log.debug(`date is ${dateYmd.toISO()}`) + const sql = `select table_name from node_storage_layout where namespace='${namespace}' and namespace_shard_id='${namespaceShardId}' and ts_start <= '${dateYmd.toISO()}' and ts_end > '${dateYmd.toISO()}'` - log.debug(sql); - return pgPool.query(sql).then(data => { - log.debug(data); - if (data.length != 1) { - return Promise.reject('missing table with the correct name'); - } - return data[0].table_name; - }).catch(err => { - log.debug(err); - return Promise.resolve(''); - }); - } - - public static async findValueInTable(tableName: string, skey: string): Promise { - log.debug(`tableName is ${tableName} , skey is ${skey}`); - const sql = `select payload + log.debug(sql) + return pgPool + .query(sql) + .then((data) => { + log.debug(data) + if (data.length != 1) { + return Promise.reject('missing table with the correct name') + } + return data[0].table_name + }) + .catch((err) => { + log.debug(err) + return Promise.resolve('') + }) + } + + public static async findValueInTable(tableName: string, skey: string): Promise { + log.debug(`tableName is ${tableName} , skey is ${skey}`) + const sql = `select payload from ${tableName} - where skey = '${skey}'`; - log.debug(sql); - return pgPool.query(sql).then(data => { - log.debug(data); - if (data.length != 1) { - return Promise.reject('missing table with the correct name'); - } - log.debug(`data found: ${JSON.stringify(data[0].payload)}`) - return data[0].payload; - }).catch(err => { - log.debug(err); - return Promise.resolve(''); - }); - } - - public static async findStorageItem(ns: string, nsIndex: string, tableName: string, skey: string): Promise { - log.debug(`tableName is ${tableName} , skey is ${skey}`); - const sql = `select skey as skey, + where skey = '${skey}'` + log.debug(sql) + return pgPool + .query(sql) + .then((data) => { + log.debug(data) + if (data.length != 1) { + return Promise.reject('missing table with the correct name') + } + log.debug(`data found: ${JSON.stringify(data[0].payload)}`) + return data[0].payload + }) + .catch((err) => { + log.debug(err) + return Promise.resolve('') + }) + } + + public static async findStorageItem( + ns: string, + nsIndex: string, + tableName: string, + skey: string + ): Promise { + log.debug(`tableName is ${tableName} , skey is ${skey}`) + const sql = `select skey as skey, extract(epoch from ts) as ts, payload as payload from ${tableName} - where skey = '${skey}' and namespace_id='${nsIndex}' and namespace='${ns}'`; - log.debug(sql); - return pgPool.query(sql).then(data => { - log.debug(data); - if (data.length != 1) { - return Promise.reject('missing table with the correct name'); - } - let row1 = data[0]; - log.debug(`data found: ${JSON.stringify(row1.payload)}`) - let record = new StorageRecord(ns, row1.skey, row1.ts, row1.payload); - return [record]; - }).catch(err => { - log.debug(err); - return Promise.resolve([]); - }); - } - - static async putValueInTable(ns: string, shardId: number, nsIndex: string, - storageTable: string, ts: string, skey: string, body: string) { - log.debug(`putValueInTable() namespace=${ns}, namespaceShardId=${shardId} - ,storageTable=${storageTable}, skey=${skey}, jsonValue=${body}`); - const sql = `INSERT INTO ${storageTable} (namespace, namespace_shard_id, namespace_id, ts, skey, dataschema, payload) - values (\${ns}, \${shardId}, \${nsIndex}, to_timestamp(\${ts}), \${skey}, 'v1', \${body}) - ON CONFLICT (namespace, namespace_shard_id, namespace_id, skey) DO UPDATE SET payload = \${body}`; - const params = { - ns, - shardId, - nsIndex, - ts, - skey, - body + where skey = '${skey}' and namespace_id='${nsIndex}' and namespace='${ns}'` + log.debug(sql) + return pgPool + .query(sql) + .then((data) => { + log.debug(data) + if (data.length != 1) { + return Promise.reject('missing table with the correct name') } - console.log(sql, params); - return pgPool.none(sql, params).then(data => { - log.debug(data); - return Promise.resolve(); - }).catch(err => { - log.debug(err); - return Promise.reject(err); - }); + const row1 = data[0] + log.debug(`data found: ${JSON.stringify(row1.payload)}`) + const record = new StorageRecord(ns, row1.skey, row1.ts, row1.payload) + return [record] + }) + .catch((err) => { + log.debug(err) + return Promise.resolve([]) + }) + } + + static async putValueInTable( + ns: string, + shardId: number, + nsIndex: string, + storageTable: string, + ts: string, + skey: string, + body: string + ) { + log.debug(`putValueInTable() namespace=${ns}, namespaceShardId=${shardId} + ,storageTable=${storageTable}, skey=${skey}, jsonValue=${body}`) + const sql = `INSERT INTO ${storageTable} (namespace, namespace_shard_id, namespace_id, ts, skey, dataschema, payload) + values (\${ns}, \${shardId}, \${nsIndex}, to_timestamp(\${ts}), \${skey}, 'v1', \${body}) + ON CONFLICT (namespace, namespace_shard_id, namespace_id, skey) DO UPDATE SET payload = \${body}` + const params = { + ns, + shardId, + nsIndex, + ts, + skey, + body } - - static async listInbox(namespace: string, namespaceShardId: number, nsIndex:string, - storageTable: string, firstTsExcluded: string, pageSize:number): Promise { - const pageLookAhead = 3; - const pageSizeForSameTimestamp = pageSize * 20; - const isFirstQuery = StrUtil.isEmpty(firstTsExcluded); - const sql = `select skey as skey, + console.log(sql, params) + return pgPool + .none(sql, params) + .then((data) => { + log.debug(data) + return Promise.resolve() + }) + .catch((err) => { + log.debug(err) + return Promise.reject(err) + }) + } + + static async listInbox( + namespace: string, + namespaceShardId: number, + nsIndex: string, + storageTable: string, + firstTsExcluded: string, + pageSize: number + ): Promise { + const pageLookAhead = 3 + const pageSizeForSameTimestamp = pageSize * 20 + const isFirstQuery = StrUtil.isEmpty(firstTsExcluded) + const sql = `select skey as skey, extract(epoch from ts) as ts, payload as payload from ${storageTable} where namespace='${namespace}' and namespace_id='${nsIndex}' - ${ isFirstQuery ? '' : `and ts > to_timestamp(${firstTsExcluded})` } + ${isFirstQuery ? '' : `and ts > to_timestamp(${firstTsExcluded})`} order by ts - limit ${pageSize + pageLookAhead}`; - log.debug(sql); - let data1 = await pgPool.any(sql); - var items = new Map(); - var lastTs: number = 0; - for (let i = 0; i < Math.min(data1.length, pageSize); i++) { - const item = DbHelper.convertRowToItem(data1[i], namespace); - items.set(item.skey, item); - lastTs = data1[i].ts; + limit ${pageSize + pageLookAhead}` + log.debug(sql) + const data1 = await pgPool.any(sql) + const items = new Map() + let lastTs: number = 0 + for (let i = 0; i < Math.min(data1.length, pageSize); i++) { + const item = DbHelper.convertRowToItem(data1[i], namespace) + items.set(item.skey, item) + lastTs = data1[i].ts + } + log.debug(`added ${items.size} items; lastTs=${lastTs}`) + // [0...{pagesize-1 (lastTs)}...{data1.length-1 (lastTsRowId)}....] + // we always request pageSize+3 rows; so if we have these additional rows we can verify that their ts != last row ts, + // otherwise we should add these additional rows to the output (works only for 2..3 rows) + // otherwise we should execute and additional page request + let lastTsRowId = pageSize - 1 + if (data1.length > pageSize) { + // add extra rows for ts = lastTs + for (let i = pageSize; i < data1.length; i++) { + if (data1[i].ts == lastTs) { + lastTsRowId = i + } else { + break } - log.debug(`added ${items.size} items; lastTs=${lastTs}`) - // [0...{pagesize-1 (lastTs)}...{data1.length-1 (lastTsRowId)}....] - // we always request pageSize+3 rows; so if we have these additional rows we can verify that their ts != last row ts, - // otherwise we should add these additional rows to the output (works only for 2..3 rows) - // otherwise we should execute and additional page request - var lastTsRowId = pageSize - 1; - if (data1.length > pageSize) { - // add extra rows for ts = lastTs - for (let i = pageSize; i < data1.length; i++) { - if (data1[i].ts == lastTs) { - lastTsRowId = i; - } else { - break; - } - } - if (lastTsRowId == data1.length - 1) { - // we have more rows with same timestamp, they won't fit in pageSize+pageLookAhead rows - // let's peform additional select for ts = lastTs - const sql2 = `select skey as skey, + } + if (lastTsRowId == data1.length - 1) { + // we have more rows with same timestamp, they won't fit in pageSize+pageLookAhead rows + // let's peform additional select for ts = lastTs + const sql2 = `select skey as skey, extract(epoch from ts) as ts, payload as payload from ${storageTable} @@ -333,71 +383,79 @@ END $$ LANGUAGE plpgsql; and namespace_id='${nsIndex}' and ts = to_timestamp(${lastTs}) order by ts - limit ${pageSizeForSameTimestamp}`; - log.debug(sql2); - let data2 = await pgPool.any(sql2); - for (let row of data2) { - const item = DbHelper.convertRowToItem(row, namespace); - items.set(item.skey, item); - } - log.debug(`extra query with ${data2.length} items to fix duplicate timestamps pagination, total size is ${items.length}`); - } else if (lastTsRowId > pageSize - 1) { - // we have more rows with same timestamp, they fit in pageSize+pageLookAhead rows - for (let i = pageSize; i <= lastTsRowId; i++) { - const item = DbHelper.convertRowToItem(data1[i], namespace); - items.set(item.skey, item); - } - log.debug(`updated to ${items.size} items to fix duplicate timestamps pagination`) - } + limit ${pageSizeForSameTimestamp}` + log.debug(sql2) + const data2 = await pgPool.any(sql2) + for (const row of data2) { + const item = DbHelper.convertRowToItem(row, namespace) + items.set(item.skey, item) + } + log.debug( + `extra query with ${data2.length} items to fix duplicate timestamps pagination, total size is ${items.length}` + ) + } else if (lastTsRowId > pageSize - 1) { + // we have more rows with same timestamp, they fit in pageSize+pageLookAhead rows + for (let i = pageSize; i <= lastTsRowId; i++) { + const item = DbHelper.convertRowToItem(data1[i], namespace) + items.set(item.skey, item) } - let itemsArr = [...items.values()]; - return { - 'items': itemsArr, - 'lastTs': lastTs - }; + log.debug(`updated to ${items.size} items to fix duplicate timestamps pagination`) + } } - - private static convertRowToItem(rowObj: any, namespace: string) { - return { - ns: namespace, - skey: rowObj.skey, - ts: rowObj.ts, - payload: rowObj.payload - }; + const itemsArr = [...items.values()] + return { + items: itemsArr, + lastTs: lastTs } - - static async listAllNsIndex() { - const updateViewIfNeeded = `select update_storage_all_namespace_view();`; - log.debug(updateViewIfNeeded); - await pgPool.any(updateViewIfNeeded); - const selectAll = `select namespace_id, last_usage from storage_all_namespace_view;`; - log.debug(selectAll); - return pgPool.manyOrNone(selectAll).then(data => { - log.debug(data); - return Promise.resolve(data == null ? [] : data.map(function (item) { - return { - 'nsId': item.namespace_id, - 'last_usage' : item.last_usage - }; - })); - }).catch(err => { - log.debug(err); - return Promise.reject(err); - }); + } + + private static convertRowToItem(rowObj: any, namespace: string) { + return { + ns: namespace, + skey: rowObj.skey, + ts: rowObj.ts, + payload: rowObj.payload } + } + + static async listAllNsIndex() { + const updateViewIfNeeded = `select update_storage_all_namespace_view();` + log.debug(updateViewIfNeeded) + await pgPool.any(updateViewIfNeeded) + const selectAll = `select namespace_id, last_usage from storage_all_namespace_view;` + log.debug(selectAll) + return pgPool + .manyOrNone(selectAll) + .then((data) => { + log.debug(data) + return Promise.resolve( + data == null + ? [] + : data.map(function (item) { + return { + nsId: item.namespace_id, + last_usage: item.last_usage + } + }) + ) + }) + .catch((err) => { + log.debug(err) + return Promise.reject(err) + }) + } } export class StorageRecord { - ns: string; - skey: string; - ts: string; - payload: any; - - - constructor(ns: string, skey: string, ts: string, payload: any) { - this.ns = ns; - this.skey = skey; - this.ts = ts; - this.payload = payload; - } -} \ No newline at end of file + ns: string + skey: string + ts: string + payload: any + + constructor(ns: string, skey: string, ts: string, payload: any) { + this.ns = ns + this.skey = skey + this.ts = ts + this.payload = payload + } +} diff --git a/src/loaders/index.ts b/src/loaders/index.ts index feef245..40d43d2 100755 --- a/src/loaders/index.ts +++ b/src/loaders/index.ts @@ -1,23 +1,21 @@ -import expressLoader from './express'; +import expressLoader from './express' // import dependencyInjectorLoader from './dependencyInjector'; - -import logger from './logger'; +import logger from './logger' // import initializer from './initializer'; // import dbLoader from './db'; // import dbListenerLoader from './dbListener'; - //We have to import at least all the events once so they can be triggered // import './events'; export default async ({ expressApp, server, testMode }) => { - logger.info('loaders init'); + logger.info('loaders init') // await dependencyInjectorLoader(); - await expressLoader({ app: expressApp }); + await expressLoader({ app: expressApp }) // await socketLoader({ server: server }); -}; +} diff --git a/src/loaders/logger.ts b/src/loaders/logger.ts index 752b321..bce3c7f 100755 --- a/src/loaders/logger.ts +++ b/src/loaders/logger.ts @@ -1,7 +1,7 @@ -import winston from 'winston'; -import config from '../config'; -import {WinstonUtil} from "../utilz/winstonUtil"; +import winston from 'winston' +import config from '../config' +import { WinstonUtil } from '../utilz/winstonUtil' const customLevels = { levels: { @@ -14,7 +14,7 @@ const customLevels = { saved: 6, verbose: 7, debug: 8, - silly: 9, + silly: 9 }, colors: { info: 'green', @@ -23,16 +23,16 @@ const customLevels = { saved: 'italic white', debug: 'yellow' } -}; +} -let transports = []; +const transports = [] transports.push( // Console should always be at 0 and dynamic log should always be at 2 // remember and not change it as it's manually baked in hijackLogger WinstonUtil.consoleTransport, WinstonUtil.debugFileTransport, - WinstonUtil.errorFileTransport, -); + WinstonUtil.errorFileTransport +) // WE SIMPLY REDIRECT ALL TO winstonUtil formatter x winstonUtil transports // this instance is being used across the whole codebase const LoggerInstance = winston.createLogger({ @@ -40,8 +40,8 @@ const LoggerInstance = winston.createLogger({ levels: customLevels.levels, format: WinstonUtil.createFormat2WhichRendersClassName(), transports -}); +}) -winston.addColors(customLevels.colors); +winston.addColors(customLevels.colors) -export default LoggerInstance; \ No newline at end of file +export default LoggerInstance diff --git a/src/services/messaging-common/messageBlock.ts b/src/services/messaging-common/messageBlock.ts index 1a4953b..2179f46 100644 --- a/src/services/messaging-common/messageBlock.ts +++ b/src/services/messaging-common/messageBlock.ts @@ -9,15 +9,16 @@ { s1, 0xD}, { s2, 0xE}, { s3, 0xF}, { d1, 0x1}, { d2, 0x2}, { d3, 0x3} ] */ -import {Coll} from '../../utilz/coll' +import { Logger } from 'winston' + +import { Check } from '../../utilz/check' +import { Coll } from '../../utilz/coll' +import { EthSig } from '../../utilz/ethSig' +import { EthUtil } from '../../utilz/EthUtil' +import { NumUtil } from '../../utilz/numUtil' +import { ObjectHasher } from '../../utilz/objectHasher' import StrUtil from '../../utilz/strUtil' -import {EthSig} from '../../utilz/ethSig' -import {Logger} from 'winston' -import {WinstonUtil} from '../../utilz/winstonUtil' -import {ObjectHasher} from '../../utilz/objectHasher' -import {EthUtil} from '../../utilz/EthUtil' -import {Check} from '../../utilz/check' -import {NumUtil} from "../../utilz/numUtil"; +import { WinstonUtil } from '../../utilz/winstonUtil' /* ex: @@ -345,10 +346,10 @@ export class MessageBlockUtil { const shards = new Set() for (const fi of block.responses) { for (const recipient of fi.header.recipientsResolved) { - let shardId = this.calculateAffectedShard(recipient.addr, shardCount); + const shardId = this.calculateAffectedShard(recipient.addr, shardCount) if (shardId == null) { - this.log.error('cannot calculate shardId for recipient %o in %o', recipient, fi); - continue; + this.log.error('cannot calculate shardId for recipient %o in %o', recipient, fi) + continue } shards.add(shardId) } @@ -364,35 +365,36 @@ export class MessageBlockUtil { // lets read this value from a contract public static calculateAffectedShard(recipientAddr: string, shardCount: number): number | null { if (StrUtil.isEmpty(recipientAddr)) { - return null; + return null } let shardId: number = null const addrObj = EthUtil.parseCaipAddress(recipientAddr) - if (addrObj != null - && !StrUtil.isEmpty(addrObj.addr) - && addrObj.addr.startsWith('0x') - && addrObj.addr.length > 4) { - const firstByteAsHex = addrObj.addr.substring(2, 4).toLowerCase(); - shardId = Number.parseInt(firstByteAsHex, 16); + if ( + addrObj != null && + !StrUtil.isEmpty(addrObj.addr) && + addrObj.addr.startsWith('0x') && + addrObj.addr.length > 4 + ) { + const firstByteAsHex = addrObj.addr.substring(2, 4).toLowerCase() + shardId = Number.parseInt(firstByteAsHex, 16) } // 2) try to get sha256 otherwise if (shardId == null) { - const firstByteAsHex = ObjectHasher.hashToSha256(recipientAddr).toLowerCase().substring(0, 2); - shardId = Number.parseInt(firstByteAsHex, 16); + const firstByteAsHex = ObjectHasher.hashToSha256(recipientAddr).toLowerCase().substring(0, 2) + shardId = Number.parseInt(firstByteAsHex, 16) } - Check.notNull(shardId); - Check.isTrue(shardId >= 0 && shardId <= 255 && NumUtil.isRoundedInteger(shardId)); - Check.isTrue(shardCount >= 1); - return shardId % shardCount; + Check.notNull(shardId) + Check.isTrue(shardId >= 0 && shardId <= 255 && NumUtil.isRoundedInteger(shardId)) + Check.isTrue(shardCount >= 1) + return shardId % shardCount } public static getBlockCreationTimeMillis(block: MessageBlock): number | null { - if (block.responsesSignatures.length > 0 - && block.responsesSignatures[0].length > 0) { - const sig = block.responsesSignatures[0][0]; - return sig?.nodeMeta?.tsMillis; + if (block.responsesSignatures.length > 0 && block.responsesSignatures[0].length > 0) { + const sig = block.responsesSignatures[0][0] + return sig?.nodeMeta?.tsMillis } - return null; + return null } public static checkBlock(block: MessageBlock, validatorsFromContract: Set): CheckResult { @@ -463,10 +465,10 @@ export class CheckResult { err: string static failWithText(err: string): CheckResult { - return {success: false, err: err} + return { success: false, err: err } } static ok(): CheckResult { - return {success: true, err: ''} + return { success: true, err: '' } } } diff --git a/src/services/messaging-common/queueClientHelper.ts b/src/services/messaging-common/queueClientHelper.ts index e97eb90..95ccce9 100644 --- a/src/services/messaging-common/queueClientHelper.ts +++ b/src/services/messaging-common/queueClientHelper.ts @@ -1,8 +1,9 @@ +import { Logger } from 'winston' + import { Check } from '../../utilz/check' -import { ValidatorContractState } from './validatorContractState' import { MySqlUtil } from '../../utilz/mySqlUtil' -import { Logger } from 'winston' import { WinstonUtil } from '../../utilz/winstonUtil' +import { ValidatorContractState } from './validatorContractState' export class QueueClientHelper { private static log: Logger = WinstonUtil.newLog(QueueClientHelper) diff --git a/src/services/messaging-common/redisClient.ts b/src/services/messaging-common/redisClient.ts index b765299..974b767 100644 --- a/src/services/messaging-common/redisClient.ts +++ b/src/services/messaging-common/redisClient.ts @@ -1,8 +1,9 @@ import { createClient, RedisClientType } from 'redis' -import config from '../../config' +import { Service } from 'typedi' import { Logger } from 'winston' + +import config from '../../config' import { WinstonUtil } from '../../utilz/winstonUtil' -import { Service } from 'typedi' @Service() export class RedisClient { diff --git a/src/services/messaging-common/storageContractState.ts b/src/services/messaging-common/storageContractState.ts index 2d9f27a..5b987c6 100644 --- a/src/services/messaging-common/storageContractState.ts +++ b/src/services/messaging-common/storageContractState.ts @@ -1,151 +1,157 @@ -import {Service} from "typedi"; -import {Logger} from "winston"; -import {WinstonUtil} from "../../utilz/winstonUtil"; -import {EnvLoader} from "../../utilz/envLoader"; -import {Contract, ethers, Wallet} from "ethers"; -import {JsonRpcProvider} from "@ethersproject/providers/src.ts/json-rpc-provider"; -import {readFileSync} from "fs"; -import {BitUtil} from "../../utilz/bitUtil"; -import {EthersUtil} from "../../utilz/ethersUtil"; -import {Coll} from "../../utilz/coll"; -import {Check} from "../../utilz/check"; - +import { JsonRpcProvider } from '@ethersproject/providers/src.ts/json-rpc-provider' +import { Contract, ethers, Wallet } from 'ethers' +import { Service } from 'typedi' +import { Logger } from 'winston' + +import { BitUtil } from '../../utilz/bitUtil' +import { Check } from '../../utilz/check' +import { Coll } from '../../utilz/coll' +import { EnvLoader } from '../../utilz/envLoader' +import { EthersUtil } from '../../utilz/ethersUtil' +import { WinstonUtil } from '../../utilz/winstonUtil' @Service() export class StorageContractState { - public log: Logger = WinstonUtil.newLog(StorageContractState); + public log: Logger = WinstonUtil.newLog(StorageContractState) - private listener: StorageContractListener; + private listener: StorageContractListener private provider: JsonRpcProvider private rpcEndpoint: string private rpcNetwork: number - private useSigner: boolean; + private useSigner: boolean // NODE STATE - private nodeWallet: Wallet | null; - private nodeAddress: string | null; + private nodeWallet: Wallet | null + private nodeAddress: string | null // CONTRACT STATE private storageCtAddr: string - private storageCt: StorageContract; - public rf: number; - public shardCount: number; + private storageCt: StorageContract + public rf: number + public shardCount: number // node0xA -> shard0, shard1, shard2 - public nodeShardMap: Map> = new Map(); + public nodeShardMap: Map> = new Map() // VARS // shard0 -> node0xA, node0xB - public shardToNodesMap: Map> = new Map(); + public shardToNodesMap: Map> = new Map() public async postConstruct(useSigner: boolean, listener: StorageContractListener) { - this.log.info('postConstruct()'); - this.listener = listener; + this.log.info('postConstruct()') + this.listener = listener this.storageCtAddr = EnvLoader.getPropertyOrFail('STORAGE_CONTRACT_ADDRESS') this.rpcEndpoint = EnvLoader.getPropertyOrFail('VALIDATOR_RPC_ENDPOINT') this.rpcNetwork = Number.parseInt(EnvLoader.getPropertyOrFail('VALIDATOR_RPC_NETWORK')) - this.provider = new ethers.providers.JsonRpcProvider( - this.rpcEndpoint, - this.rpcNetwork - ) - this.useSigner = useSigner; + this.provider = new ethers.providers.JsonRpcProvider(this.rpcEndpoint, this.rpcNetwork) + this.useSigner = useSigner if (useSigner) { - let connect = await EthersUtil.connectWithKey( + const connect = await EthersUtil.connectWithKey( EnvLoader.getPropertyOrFail('CONFIG_DIR'), EnvLoader.getPropertyOrFail('VALIDATOR_PRIVATE_KEY_FILE'), EnvLoader.getPropertyOrFail('VALIDATOR_PRIVATE_KEY_PASS'), 'StorageV1.json', EnvLoader.getPropertyOrFail('STORAGE_CONTRACT_ADDRESS'), this.provider - ); - this.storageCt = connect.contract; - this.nodeWallet = connect.nodeWallet; - this.nodeAddress = connect.nodeAddress; + ) + this.storageCt = connect.contract + this.nodeWallet = connect.nodeWallet + this.nodeAddress = connect.nodeAddress } else { - this.storageCt = await EthersUtil.connectWithoutKey( - EnvLoader.getPropertyOrFail('CONFIG_DIR'), - 'StorageV1.json', - EnvLoader.getPropertyOrFail('STORAGE_CONTRACT_ADDRESS'), - this.provider + this.storageCt = ( + await EthersUtil.connectWithoutKey( + EnvLoader.getPropertyOrFail('CONFIG_DIR'), + 'StorageV1.json', + EnvLoader.getPropertyOrFail('STORAGE_CONTRACT_ADDRESS'), + this.provider + ) ) } - await this.readContractState(); - await this.subscribeToContractChanges(); // todo ? ethers or hardhat always emits 1 fake event + await this.readContractState() + await this.subscribeToContractChanges() // todo ? ethers or hardhat always emits 1 fake event } public async readContractState() { this.log.info(`connected to StorageContract`) - this.rf = await this.storageCt.rf(); - let nodeCount = await this.storageCt.nodeCount(); - this.shardCount = await this.storageCt.SHARD_COUNT(); - this.log.info(`rf: ${this.rf} , shard count: ${this.shardCount} total nodeCount: ${nodeCount}`); - let nodesAddrList = await this.storageCt.getNodeAddresses(); + this.rf = await this.storageCt.rf() + const nodeCount = await this.storageCt.nodeCount() + this.shardCount = await this.storageCt.SHARD_COUNT() + this.log.info(`rf: ${this.rf} , shard count: ${this.shardCount} total nodeCount: ${nodeCount}`) + const nodesAddrList = await this.storageCt.getNodeAddresses() - await this.reloadEveryAddressAndNotifyListeners(nodesAddrList); + await this.reloadEveryAddressAndNotifyListeners(nodesAddrList) } public async subscribeToContractChanges() { this.storageCt.on('SNodeMappingChanged', async (nodeList: string[]) => { - this.log.info(`EVENT: SNodeMappingChanged: nodeList=${JSON.stringify(nodeList)}`); - await this.reloadEveryAddressAndNotifyListeners(nodeList); - }); + this.log.info(`EVENT: SNodeMappingChanged: nodeList=${JSON.stringify(nodeList)}`) + await this.reloadEveryAddressAndNotifyListeners(nodeList) + }) } // todo we can add 1 contract call for all addrs async reloadEveryAddressAndNotifyListeners(nodeAddrList: string[]): Promise { for (const nodeAddr of nodeAddrList) { - let nodeShardmask = await this.storageCt.getNodeShardsByAddr(nodeAddr); - const shardSet = Coll.arrayToSet(BitUtil.bitsToPositions(nodeShardmask)); - this.nodeShardMap.set(nodeAddr, shardSet); - this.log.info(`node %s is re-assigned to shards (%s) : %s`, - nodeAddr, nodeShardmask.toString(2), Coll.setToArray(shardSet)); + const nodeShardmask = await this.storageCt.getNodeShardsByAddr(nodeAddr) + const shardSet = Coll.arrayToSet(BitUtil.bitsToPositions(nodeShardmask)) + this.nodeShardMap.set(nodeAddr, shardSet) + this.log.info( + `node %s is re-assigned to shards (%s) : %s`, + nodeAddr, + nodeShardmask.toString(2), + Coll.setToArray(shardSet) + ) } - this.shardToNodesMap.clear(); + this.shardToNodesMap.clear() for (const [nodeAddr, shardSet] of this.nodeShardMap) { for (const shard of shardSet) { - let nodes = this.shardToNodesMap.get(shard); + let nodes = this.shardToNodesMap.get(shard) if (nodes == null) { - nodes = new Set(); - this.shardToNodesMap.set(shard, nodes); + nodes = new Set() + this.shardToNodesMap.set(shard, nodes) } - nodes.add(nodeAddr); + nodes.add(nodeAddr) } } - let nodeShards: Set = null; + let nodeShards: Set = null if (this.useSigner) { - this.log.info(`this node %s is assigned to shards (%s) : %s`, + this.log.info( + `this node %s is assigned to shards (%s) : %s`, this.nodeAddress, - Coll.setToArray(this.getNodeShards())); - nodeShards = this.getNodeShards(); + Coll.setToArray(this.getNodeShards()) + ) + nodeShards = this.getNodeShards() } - await this.listener.handleReshard(nodeShards, this.nodeShardMap); + await this.listener.handleReshard(nodeShards, this.nodeShardMap) } // fails if this.nodeAddress is not defined public getNodeShards(): Set { - Check.notEmpty(this.nodeAddress); - const nodeShards = this.nodeShardMap.get(this.nodeAddress); - Check.notEmptySet(nodeShards); - return nodeShards; + Check.notEmpty(this.nodeAddress) + const nodeShards = this.nodeShardMap.get(this.nodeAddress) + Check.notEmptySet(nodeShards) + return nodeShards } public getStorageNodesForShard(shard: number): Set | null { - return this.shardToNodesMap.get(shard); + return this.shardToNodesMap.get(shard) } } -type StorageContract = StorageContractAPI & Contract; +type StorageContract = StorageContractAPI & Contract export interface StorageContractAPI { + rf(): Promise - rf(): Promise; + getNodeAddresses(): Promise - getNodeAddresses(): Promise; + getNodeShardsByAddr(addr: string): Promise - getNodeShardsByAddr(addr: string): Promise; + nodeCount(): Promise - nodeCount(): Promise; - - SHARD_COUNT(): Promise; + SHARD_COUNT(): Promise } export interface StorageContractListener { - handleReshard(currentNodeShards: Set | null, allNodeShards: Map>): Promise; -} \ No newline at end of file + handleReshard( + currentNodeShards: Set | null, + allNodeShards: Map> + ): Promise +} diff --git a/src/services/messaging-common/validatorContractState.ts b/src/services/messaging-common/validatorContractState.ts index 0a8f7e5..347c2bc 100644 --- a/src/services/messaging-common/validatorContractState.ts +++ b/src/services/messaging-common/validatorContractState.ts @@ -1,12 +1,12 @@ -import { Service } from 'typedi' +import { JsonRpcProvider } from '@ethersproject/providers/src.ts/json-rpc-provider' import { Contract, ethers, Wallet } from 'ethers' -import StrUtil from '../../utilz/strUtil' - import fs, { readFileSync } from 'fs' import path from 'path' -import { JsonRpcProvider } from '@ethersproject/providers/src.ts/json-rpc-provider' -import { EnvLoader } from '../../utilz/envLoader' +import { Service } from 'typedi' import { Logger } from 'winston' + +import { EnvLoader } from '../../utilz/envLoader' +import StrUtil from '../../utilz/strUtil' import { WinstonUtil } from '../../utilz/winstonUtil' /* diff --git a/src/services/messaging-dset/queueClient.ts b/src/services/messaging-dset/queueClient.ts index 7443f52..7381261 100644 --- a/src/services/messaging-dset/queueClient.ts +++ b/src/services/messaging-dset/queueClient.ts @@ -1,10 +1,11 @@ // ------------------------------ // reads other node queue fully, appends everything to the local queue/storage -import {Logger} from 'winston' -import {WinstonUtil} from '../../utilz/winstonUtil' -import {MySqlUtil} from '../../utilz/mySqlUtil' import axios from 'axios' -import {Consumer, QItem} from './queueTypes' +import { Logger } from 'winston' + +import { MySqlUtil } from '../../utilz/mySqlUtil' +import { WinstonUtil } from '../../utilz/winstonUtil' +import { Consumer, QItem } from './queueTypes' export class QueueClient { public log: Logger = WinstonUtil.newLog(QueueClient) @@ -66,12 +67,12 @@ export class QueueClient { } for (const item of reply.items) { endpointStats.downloadedItems++ - let appendSuccessful = false; + let appendSuccessful = false try { - appendSuccessful = await this.consumer.accept(item); + appendSuccessful = await this.consumer.accept(item) } catch (e) { - this.log.error('error processing accept(): queue %s: ', this.queueName); - this.log.error(e); + this.log.error('error processing accept(): queue %s: ', this.queueName) + this.log.error(e) } if (appendSuccessful) { endpointStats.newItems++ @@ -113,7 +114,7 @@ export class QueueClient { public async readLastOffset(queueName: string, baseUri: string): Promise { const url = `${baseUri}/api/v1/dset/queue/${queueName}/lastOffset` - const resp: { result: number } = await axios.get(url, {timeout: 3000}) + const resp: { result: number } = await axios.get(url, { timeout: 3000 }) return resp.result } } diff --git a/src/services/messaging-dset/queueServer.ts b/src/services/messaging-dset/queueServer.ts index 3a5616b..1a770d0 100644 --- a/src/services/messaging-dset/queueServer.ts +++ b/src/services/messaging-dset/queueServer.ts @@ -3,11 +3,12 @@ // this node: appends it - if storage says yes to a new item // other nodes: read this from client import { Logger } from 'winston' -import { WinstonUtil } from '../../utilz/winstonUtil' + import { MySqlUtil } from '../../utilz/mySqlUtil' -import { Consumer, DCmd, QItem } from './queueTypes' -import StrUtil from '../../utilz/strUtil' import { ObjectHasher } from '../../utilz/objectHasher' +import StrUtil from '../../utilz/strUtil' +import { WinstonUtil } from '../../utilz/winstonUtil' +import { Consumer, DCmd, QItem } from './queueTypes' export class QueueServer implements Consumer { private log: Logger = WinstonUtil.newLog(QueueServer) diff --git a/src/services/messaging/BlockStorage.ts b/src/services/messaging/BlockStorage.ts index 4a36685..1209c90 100644 --- a/src/services/messaging/BlockStorage.ts +++ b/src/services/messaging/BlockStorage.ts @@ -1,20 +1,20 @@ -import {Service} from "typedi"; -import {WinstonUtil} from "../../utilz/winstonUtil"; -import {Logger} from "winston"; -import {MySqlUtil} from "../../utilz/mySqlUtil"; -import {MessageBlock, MessageBlockUtil} from "../messaging-common/messageBlock"; -import StrUtil from "../../utilz/strUtil"; -import {Coll} from "../../utilz/coll"; -import {Check} from "../../utilz/check"; -import DbHelper from "../../helpers/dbHelper"; +import { Service } from 'typedi' +import { Logger } from 'winston' + +import { Check } from '../../utilz/check' +import { Coll } from '../../utilz/coll' +import { MySqlUtil } from '../../utilz/mySqlUtil' +import StrUtil from '../../utilz/strUtil' +import { WinstonUtil } from '../../utilz/winstonUtil' +import { MessageBlock } from '../messaging-common/messageBlock' // stores everything in MySQL @Service() export class BlockStorage { - public log: Logger = WinstonUtil.newLog(BlockStorage); + public log: Logger = WinstonUtil.newLog(BlockStorage) public async postConstruct() { - await this.createStorageTablesIfNeeded(); + await this.createStorageTablesIfNeeded() } private async createStorageTablesIfNeeded() { @@ -27,7 +27,7 @@ export class BlockStorage { PRIMARY KEY (object_hash) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; - `); + `) await MySqlUtil.insert(` CREATE TABLE IF NOT EXISTS dset_queue_mblock @@ -41,7 +41,7 @@ export class BlockStorage { FOREIGN KEY (object_hash) REFERENCES blocks (object_hash) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; - `); + `) await MySqlUtil.insert(` CREATE TABLE IF NOT EXISTS dset_client @@ -56,7 +56,7 @@ export class BlockStorage { UNIQUE KEY uniq_dset_name_and_target (queue_name, target_node_id) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; - `); + `) await MySqlUtil.insert(` CREATE TABLE IF NOT EXISTS contract_shards @@ -67,40 +67,51 @@ export class BlockStorage { ) ENGINE = InnoDB DEFAULT CHARSET = utf8; - `); + `) } - async saveBlockWithShardData(mb: MessageBlock, calculatedHash: string, shardSet: Set): Promise { + async saveBlockWithShardData( + mb: MessageBlock, + calculatedHash: string, + shardSet: Set + ): Promise { // NOTE: the code already atomically updates the db , // so let's drop select because it's excessive) - let hashFromDb = await MySqlUtil.queryOneValueOrDefault( + const hashFromDb = await MySqlUtil.queryOneValueOrDefault( 'SELECT object_hash FROM blocks where object_hash=?', - null, calculatedHash); + null, + calculatedHash + ) if (hashFromDb != null) { - this.log.info('received block with hash %s, ' + - 'already exists in the storage at index %s, ignoring', - calculatedHash, hashFromDb); - return false; + this.log.info( + 'received block with hash %s, ' + 'already exists in the storage at index %s, ignoring', + calculatedHash, + hashFromDb + ) + return false } // insert block - this.log.info('received block with hash %s, adding to the db', calculatedHash); - const objectAsJson = JSON.stringify(mb); - const shardSetAsJson = JSON.stringify(Coll.setToArray(shardSet)); + this.log.info('received block with hash %s, adding to the db', calculatedHash) + const objectAsJson = JSON.stringify(mb) + const shardSetAsJson = JSON.stringify(Coll.setToArray(shardSet)) const res = await MySqlUtil.insert( `INSERT IGNORE INTO blocks(object, object_hash, object_shards) VALUES (?, ?, ?)`, - objectAsJson, calculatedHash, shardSetAsJson); - let requiresProcessing = res.affectedRows === 1; + objectAsJson, + calculatedHash, + shardSetAsJson + ) + const requiresProcessing = res.affectedRows === 1 if (!requiresProcessing) { - return false; + return false } // insert block to shard mapping - let valuesStr = ''; - let valuesArr = []; + let valuesStr = '' + const valuesArr = [] for (const shardId of shardSet) { - valuesArr.push(calculatedHash, shardId); + valuesArr.push(calculatedHash, shardId) valuesStr += valuesStr.length == 0 ? '(? , ?)' : ',(? , ?)' } const res2 = await MySqlUtil.insert( @@ -109,8 +120,9 @@ export class BlockStorage { INTO dset_queue_mblock(object_hash, object_shard) VALUES ${valuesStr}`, - ...valuesArr); - return true; + ...valuesArr + ) + return true } /** @@ -120,15 +132,17 @@ export class BlockStorage { */ public async loadNodeShards(): Promise | null> { const shardsAssigned = await MySqlUtil.queryOneValueOrDefault( - 'select shards_assigned from contract_shards order by ts desc limit 1', null); - let result: Set; + 'select shards_assigned from contract_shards order by ts desc limit 1', + null + ) + let result: Set if (shardsAssigned != null && !StrUtil.isEmpty(shardsAssigned)) { - const arr = JSON.parse(shardsAssigned); - result = new Set(arr); + const arr = JSON.parse(shardsAssigned) + result = new Set(arr) } else { - result = new Set(); + result = new Set() } - return result; + return result } /** @@ -137,9 +151,12 @@ export class BlockStorage { * @param nodeShards */ public async saveNodeShards(nodeShards: Set): Promise { - let arr = Coll.setToArray(nodeShards); - let arrAsJson = JSON.stringify(arr); - await MySqlUtil.insert('INSERT IGNORE INTO contract_shards(shards_assigned) VALUES(?)', arrAsJson); + const arr = Coll.setToArray(nodeShards) + const arrAsJson = JSON.stringify(arr) + await MySqlUtil.insert( + 'INSERT IGNORE INTO contract_shards(shards_assigned) VALUES(?)', + arrAsJson + ) } /** @@ -154,41 +171,47 @@ export class BlockStorage { * @param shardsToLookFor shards filter * @param handler callback for processing */ - public async iterateAllStoredBlocks(shardCountFromStorageContract: number, - pageSize: number, - shardsToLookFor: Set, - handler: (messageBlockJson: string, - messageBlockHash: string, - messageBlockShards: Set) => Promise) { + public async iterateAllStoredBlocks( + shardCountFromStorageContract: number, + pageSize: number, + shardsToLookFor: Set, + handler: ( + messageBlockJson: string, + messageBlockHash: string, + messageBlockShards: Set + ) => Promise + ) { if (shardsToLookFor.size == 0) { - this.log.debug('iterateAllStoredBlocks(): no shards to unpack '); - return; + this.log.debug('iterateAllStoredBlocks(): no shards to unpack ') + return } // [1,2] => (1, 2) - let shardsAsCsv = Array.from(shardsToLookFor.keys()).join(','); + const shardsAsCsv = Array.from(shardsToLookFor.keys()).join(',') // query size - let queryLimit = pageSize; - Check.isTrue(queryLimit > 0 && queryLimit < 1000, 'bad query limit'); - let subqueryRowLimit = Math.round(3 * pageSize * shardCountFromStorageContract / 2); - Check.isTrue(shardCountFromStorageContract > 0 && shardCountFromStorageContract < 1000, - 'bad subquery limit'); + const queryLimit = pageSize + Check.isTrue(queryLimit > 0 && queryLimit < 1000, 'bad query limit') + const subqueryRowLimit = Math.round((3 * pageSize * shardCountFromStorageContract) / 2) + Check.isTrue( + shardCountFromStorageContract > 0 && shardCountFromStorageContract < 1000, + 'bad subquery limit' + ) // remember last N object hashes; // this is the amount of a page that should be enough to remove duplicate object hashes // i.e. 32 shards x 30 rows = approx 1000 rows of history - let cache = new Set(); - let cacheMaxSize = 2 * subqueryRowLimit; + const cache = new Set() + const cacheMaxSize = 2 * subqueryRowLimit - let fromId = null; - let rows = null; + let fromId = null + let rows = null do { /* Finds rows | minId | object_hash | shardsJson | | 3 | 1 | [2,1] | */ - rows = await MySqlUtil.queryArr<{ minId: number, object_hash: string, shardsJson: string }>( + rows = await MySqlUtil.queryArr<{ minId: number; object_hash: string; shardsJson: string }>( `SELECT MIN(id) as minId, object_hash, CONCAT('[', GROUP_CONCAT(object_shard), ']') as shardsJson FROM (SELECT id, object_hash, object_shard FROM dset_queue_mblock @@ -199,30 +222,37 @@ export class BlockStorage { GROUP BY object_hash ORDER BY minId DESC LIMIT ?`, - fromId, fromId, subqueryRowLimit, queryLimit); + fromId, + fromId, + subqueryRowLimit, + queryLimit + ) for (const row of rows) { - const mbHash = row.object_hash; + const mbHash = row.object_hash if (!cache.has(mbHash)) { - const row = await MySqlUtil.queryOneRow<{ object: string, object_shards: string }>( - 'select object, object_shards from blocks where object_hash=?', mbHash); + const row = await MySqlUtil.queryOneRow<{ object: string; object_shards: string }>( + 'select object, object_shards from blocks where object_hash=?', + mbHash + ) if (row == null || StrUtil.isEmpty(row.object)) { - this.log.error('skipping objectHash=%s because there is no matching shard info in blocks table', - mbHash); - continue; + this.log.error( + 'skipping objectHash=%s because there is no matching shard info in blocks table', + mbHash + ) + continue } - const mbShardSet = Coll.parseAsNumberSet(row.object_shards); - await handler(row.object, mbHash, mbShardSet); - cache.add(mbHash); + const mbShardSet = Coll.parseAsNumberSet(row.object_shards) + await handler(row.object, mbHash, mbShardSet) + cache.add(mbHash) if (cache.size > cacheMaxSize) { - const firstCachedValue = cache.values().next().value; - cache.delete(firstCachedValue); + const firstCachedValue = cache.values().next().value + cache.delete(firstCachedValue) } } - fromId = Math.min(fromId, row.minId); + fromId = Math.min(fromId, row.minId) } - Check.isTrue(cache.size <= cacheMaxSize); - } while (rows != null && rows.length > 0); + Check.isTrue(cache.size <= cacheMaxSize) + } while (rows != null && rows.length > 0) } - -} \ No newline at end of file +} diff --git a/src/services/messaging/IndexStorage.ts b/src/services/messaging/IndexStorage.ts index 68c6d9f..0733f68 100644 --- a/src/services/messaging/IndexStorage.ts +++ b/src/services/messaging/IndexStorage.ts @@ -1,30 +1,29 @@ -import {WinstonUtil} from "../../utilz/winstonUtil"; -import {Inject, Service} from "typedi"; -import {Logger} from "winston"; -import {FPayload, MessageBlock, MessageBlockUtil} from "../messaging-common/messageBlock"; -import {Check} from "../../utilz/check"; -import {Coll} from "../../utilz/coll"; -import DateUtil from "../../utilz/dateUtil"; -import DbHelper from "../../helpers/dbHelper"; -import StrUtil from "../../utilz/strUtil"; -import {StorageContractState} from "../messaging-common/storageContractState"; -import {ValidatorContractState} from "../messaging-common/validatorContractState"; -import {PgUtil} from "../../utilz/pgUtil"; +import { Inject, Service } from 'typedi' +import { Logger } from 'winston' +import DbHelper from '../../helpers/dbHelper' +import { Coll } from '../../utilz/coll' +import DateUtil from '../../utilz/dateUtil' +import { PgUtil } from '../../utilz/pgUtil' +import StrUtil from '../../utilz/strUtil' +import { WinstonUtil } from '../../utilz/winstonUtil' +import { FPayload, MessageBlock, MessageBlockUtil } from '../messaging-common/messageBlock' +import { StorageContractState } from '../messaging-common/storageContractState' +import { ValidatorContractState } from '../messaging-common/validatorContractState' // stores everything in Postgres (!) @Service() export class IndexStorage { - public log: Logger = WinstonUtil.newLog(IndexStorage); + public log: Logger = WinstonUtil.newLog(IndexStorage) @Inject() - private valContractState: ValidatorContractState; + private valContractState: ValidatorContractState @Inject() - private storageContractState: StorageContractState; + private storageContractState: StorageContractState public async postConstruct() { - await DbHelper.createStorageTablesIfNeeded(); + await DbHelper.createStorageTablesIfNeeded() } /* @@ -35,36 +34,45 @@ export class IndexStorage { */ public async unpackBlockToInboxes(mb: MessageBlock, shardSet: Set) { // this is the list of shards that we support on this node - const nodeShards = this.storageContractState.getNodeShards(); - this.log.debug('storage node supports %s shards: %o', nodeShards.size, nodeShards); - let shardsToProcess = Coll.intersectSet(shardSet, nodeShards); + const nodeShards = this.storageContractState.getNodeShards() + this.log.debug('storage node supports %s shards: %o', nodeShards.size, nodeShards) + const shardsToProcess = Coll.intersectSet(shardSet, nodeShards) this.log.debug('block %s has %d inboxes to unpack', mb.id, shardsToProcess) if (shardsToProcess.size == 0) { - this.log.debug('finished'); - return; + this.log.debug('finished') + return } // ex: 1661214142.123456 - let tsString = '' + (MessageBlockUtil.getBlockCreationTimeMillis(mb) / 1000.0); + let tsString = '' + MessageBlockUtil.getBlockCreationTimeMillis(mb) / 1000.0 if (tsString == null) { - tsString = '' + DateUtil.currentTimeSeconds(); + tsString = '' + DateUtil.currentTimeSeconds() } - const currentNodeId = this.valContractState.nodeId; + const currentNodeId = this.valContractState.nodeId for (let i = 0; i < mb.responses.length; i++) { - const feedItem = mb.responses[i]; - const targetWallets: string[] = MessageBlockUtil.calculateRecipients(mb, i); + const feedItem = mb.responses[i] + const targetWallets: string[] = MessageBlockUtil.calculateRecipients(mb, i) for (let i1 = 0; i1 < targetWallets.length; i1++) { - const targetAddr = targetWallets[i1]; - let targetShard = MessageBlockUtil.calculateAffectedShard(targetAddr, this.storageContractState.shardCount); + const targetAddr = targetWallets[i1] + const targetShard = MessageBlockUtil.calculateAffectedShard( + targetAddr, + this.storageContractState.shardCount + ) if (!shardsToProcess.has(targetShard)) { - continue; + continue } - await this.putPayloadToInbox('inbox', targetShard, targetAddr, tsString, currentNodeId, feedItem.payload); + await this.putPayloadToInbox( + 'inbox', + targetShard, + targetAddr, + tsString, + currentNodeId, + feedItem.payload + ) } } } - /** * Puts a single item into * This is POSTGRES @@ -78,60 +86,77 @@ export class IndexStorage { * * todo pass ts as number */ - public async putPayloadToInbox(nsName: string, nsShardId: number, nsId: string, - ts: string, - nodeId: string, - fpayload: FPayload) { - const key = fpayload.data.sid; - fpayload.recipients = null; // null recipients field because we don't need that - const date = DateUtil.parseUnixFloatAsDateTime(ts); + public async putPayloadToInbox( + nsName: string, + nsShardId: number, + nsId: string, + ts: string, + nodeId: string, + fpayload: FPayload + ) { + const key = fpayload.data.sid + fpayload.recipients = null // null recipients field because we don't need that + const date = DateUtil.parseUnixFloatAsDateTime(ts) this.log.debug(`parsed date ${ts} -> ${date}`) - let storageTable = await DbHelper.findStorageTableByDate(nsName, nsShardId, date); + let storageTable = await DbHelper.findStorageTableByDate(nsName, nsShardId, date) this.log.debug(`found table ${storageTable}`) if (StrUtil.isEmpty(storageTable)) { - this.log.error('storage table not found'); - let monthStart = date.startOf('month').toISODate().toString(); - let monthEndExclusive = date.startOf('month').plus({months: 1}).toISODate().toString(); - this.log.debug('creating new storage table'); - const dateYYYYMM = DateUtil.formatYYYYMM(date); - const tableName = `storage_ns_${nsName}_d_${dateYYYYMM}`; - const recordCreated = await DbHelper.createNewNodestorageRecord(nsName, nsShardId, - monthStart, monthEndExclusive, tableName); + this.log.error('storage table not found') + const monthStart = date.startOf('month').toISODate().toString() + const monthEndExclusive = date.startOf('month').plus({ months: 1 }).toISODate().toString() + this.log.debug('creating new storage table') + const dateYYYYMM = DateUtil.formatYYYYMM(date) + const tableName = `storage_ns_${nsName}_d_${dateYYYYMM}` + const recordCreated = await DbHelper.createNewNodestorageRecord( + nsName, + nsShardId, + monthStart, + monthEndExclusive, + tableName + ) if (recordCreated) { this.log.debug('record created: ', recordCreated) // we've added a new record to node_storage_layout => we can safely try to create a table // otherwise, if many connections attempt to create a table from multiple threads // it leads to postgres deadlock sometimes - await DbHelper.createNewStorageTable(tableName); + await DbHelper.createNewStorageTable(tableName) this.log.debug('creating node storage layout mapping') - storageTable = tableName; + storageTable = tableName } } - const storageValue = await DbHelper.putValueInTable(nsName, nsShardId, nsId, storageTable, - ts, key, JSON.stringify(fpayload)); + const storageValue = await DbHelper.putValueInTable( + nsName, + nsShardId, + nsId, + storageTable, + ts, + key, + JSON.stringify(fpayload) + ) this.log.debug(`found value: ${storageValue}`) } // todo remove shard entries from node_storage_layout also? public async deleteShardsFromInboxes(shardsToDelete: Set) { - this.log.debug('deleteShardsFromInboxes(): shardsToDelete: %j', - Coll.setToArray(shardsToDelete)); + this.log.debug('deleteShardsFromInboxes(): shardsToDelete: %j', Coll.setToArray(shardsToDelete)) if (shardsToDelete.size == 0) { - return; + return } // delete from index - let idsToDelete = Coll.numberSetToSqlQuoted(shardsToDelete); + const idsToDelete = Coll.numberSetToSqlQuoted(shardsToDelete) const rows = await PgUtil.queryArr<{ table_name: string }>( `select distinct table_name from node_storage_layout - where namespace_shard_id in ${idsToDelete}`); + where namespace_shard_id in ${idsToDelete}` + ) for (const row of rows) { - this.log.debug('clearing table %s from shards %o', row, idsToDelete); - await PgUtil.update(`delete + this.log.debug('clearing table %s from shards %o', row, idsToDelete) + await PgUtil.update( + `delete from ${row.table_name} where namespace_shard_id in ${idsToDelete}`, - idsToDelete); + idsToDelete + ) } } - -} \ No newline at end of file +} diff --git a/src/services/messaging/queueManager.ts b/src/services/messaging/queueManager.ts index 58c8586..3fdac61 100644 --- a/src/services/messaging/queueManager.ts +++ b/src/services/messaging/queueManager.ts @@ -1,27 +1,29 @@ -import {Inject, Service} from 'typedi' -import {Logger} from 'winston' import schedule from 'node-schedule' -import {ValidatorContractState} from '../messaging-common/validatorContractState' -import {WinstonUtil} from '../../utilz/winstonUtil' -import {QueueServer} from '../messaging-dset/queueServer' -import {QueueClient} from '../messaging-dset/queueClient' -import StorageNode from "./storageNode"; -import {QueueClientHelper} from "../messaging-common/queueClientHelper"; -import {EnvLoader} from "../../utilz/envLoader"; +import { Inject, Service } from 'typedi' +import { Logger } from 'winston' +import { EnvLoader } from '../../utilz/envLoader' +import { WinstonUtil } from '../../utilz/winstonUtil' +import { QueueClientHelper } from '../messaging-common/queueClientHelper' +import { ValidatorContractState } from '../messaging-common/validatorContractState' +import { QueueClient } from '../messaging-dset/queueClient' +import { QueueServer } from '../messaging-dset/queueServer' +import StorageNode from './storageNode' @Service() export class QueueManager { public log: Logger = WinstonUtil.newLog(QueueManager) @Inject((type) => ValidatorContractState) - private contract: ValidatorContractState; - @Inject(type => StorageNode) - private storageNode:StorageNode; - + private contract: ValidatorContractState + @Inject((type) => StorageNode) + private storageNode: StorageNode // PING: schedule - private readonly CLIENT_READ_SCHEDULE = EnvLoader.getPropertyOrDefault('CLIENT_READ_SCHEDULE', '*/30 * * * * *'); + private readonly CLIENT_READ_SCHEDULE = EnvLoader.getPropertyOrDefault( + 'CLIENT_READ_SCHEDULE', + '*/30 * * * * *' + ) public static QUEUE_MBLOCK = 'mblock' mblockQueue: QueueServer @@ -36,17 +38,19 @@ export class QueueManager { public async postConstruct() { this.log.debug('postConstruct') this.mblockClient = new QueueClient(this.storageNode, QueueManager.QUEUE_MBLOCK) - await QueueClientHelper.initClientForEveryQueueForEveryValidator(this.contract, [QueueManager.QUEUE_MBLOCK]) + await QueueClientHelper.initClientForEveryQueueForEveryValidator(this.contract, [ + QueueManager.QUEUE_MBLOCK + ]) const qs = this schedule.scheduleJob(this.CLIENT_READ_SCHEDULE, async function () { const dbgPrefix = 'PollRemoteQueue' try { await qs.mblockClient.pollRemoteQueue(qs.CLIENT_REQUEST_PER_SCHEDULED_JOB) - qs.log.info(`CRON %s started`, dbgPrefix); + qs.log.info(`CRON %s started`, dbgPrefix) } catch (err) { - qs.log.error(`CRON %s failed %o`, dbgPrefix, err); + qs.log.error(`CRON %s failed %o`, dbgPrefix, err) } finally { - qs.log.info(`CRON %s finished`, dbgPrefix); + qs.log.info(`CRON %s finished`, dbgPrefix) } }) } @@ -63,9 +67,9 @@ export class QueueManager { return { result: lastOffset } } - public async readItems(dsetName: string, firstOffset: number){ + public async readItems(dsetName: string, firstOffset: number) { const q = this.getQueue(dsetName) - return await q.readWithLastOffset(firstOffset); + return await q.readWithLastOffset(firstOffset) } public async pollRemoteQueues(): Promise { @@ -73,5 +77,3 @@ export class QueueManager { return result } } - - diff --git a/src/services/messaging/storageNode.ts b/src/services/messaging/storageNode.ts index 0f69292..d965563 100644 --- a/src/services/messaging/storageNode.ts +++ b/src/services/messaging/storageNode.ts @@ -1,16 +1,20 @@ -import {Inject, Service} from 'typedi' -import {ValidatorContractState} from "../messaging-common/validatorContractState"; -import {Logger} from "winston"; -import {FPayload, MessageBlock, MessageBlockUtil} from "../messaging-common/messageBlock"; -import {QueueClient} from "../messaging-dset/queueClient"; -import {Consumer, QItem} from "../messaging-dset/queueTypes"; -import {BlockStorage} from "./BlockStorage"; -import {QueueManager} from "./queueManager"; -import {Coll} from "../../utilz/coll"; -import {Check} from "../../utilz/check"; -import {WinstonUtil} from "../../utilz/winstonUtil"; -import {StorageContractListener, StorageContractState} from "../messaging-common/storageContractState"; -import {IndexStorage} from "./IndexStorage"; +import { Inject, Service } from 'typedi' +import { Logger } from 'winston' + +import { Check } from '../../utilz/check' +import { Coll } from '../../utilz/coll' +import { WinstonUtil } from '../../utilz/winstonUtil' +import { MessageBlock, MessageBlockUtil } from '../messaging-common/messageBlock' +import { + StorageContractListener, + StorageContractState +} from '../messaging-common/storageContractState' +import { ValidatorContractState } from '../messaging-common/validatorContractState' +import { QueueClient } from '../messaging-dset/queueClient' +import { Consumer, QItem } from '../messaging-dset/queueTypes' +import { BlockStorage } from './BlockStorage' +import { IndexStorage } from './IndexStorage' +import { QueueManager } from './queueManager' // todo reshard(): // raise a flag while executing this; handle new updates somehow? @@ -22,98 +26,121 @@ export default class StorageNode implements Consumer, StorageContractList public log: Logger = WinstonUtil.newLog(StorageNode) @Inject() - private valContractState: ValidatorContractState; + private valContractState: ValidatorContractState @Inject() - private storageContractState: StorageContractState; + private storageContractState: StorageContractState - @Inject(type => QueueManager) - private queueManager: QueueManager; + @Inject((type) => QueueManager) + private queueManager: QueueManager @Inject() - private blockStorage: BlockStorage; + private blockStorage: BlockStorage @Inject() - private indexStorage: IndexStorage; + private indexStorage: IndexStorage - private client: QueueClient; + private client: QueueClient public async postConstruct() { - await this.blockStorage.postConstruct(); - await this.indexStorage.postConstruct(); - await this.valContractState.postConstruct(); - await this.storageContractState.postConstruct(true, this); - await this.queueManager.postConstruct(); + await this.blockStorage.postConstruct() + await this.indexStorage.postConstruct() + await this.valContractState.postConstruct() + await this.storageContractState.postConstruct(true, this) + await this.queueManager.postConstruct() } // remote queue handler async accept(item: QItem): Promise { // check hash - let mb = item.object; - Check.notEmpty(mb.id, 'message block has no id'); - let calculatedHash = MessageBlockUtil.calculateHash(mb); + const mb = item.object + Check.notEmpty(mb.id, 'message block has no id') + const calculatedHash = MessageBlockUtil.calculateHash(mb) if (calculatedHash !== item.object_hash) { - this.log.error('received item hash=%s , ' + - 'which differs from calculatedHash=%s, ' + - 'ignoring the block because producer calculated the hash incorrectly', - item.object_hash, calculatedHash); - return false; + this.log.error( + 'received item hash=%s , ' + + 'which differs from calculatedHash=%s, ' + + 'ignoring the block because producer calculated the hash incorrectly', + item.object_hash, + calculatedHash + ) + return false } // check contents // since this check is not for historical data, but for realtime data, // so we do not care about old blocked validators which might occur in the historical queue - let activeValidators = Coll.arrayToFields(this.valContractState.getActiveValidators(), 'nodeId'); - let check1 = MessageBlockUtil.checkBlock(mb, activeValidators); + const activeValidators = Coll.arrayToFields( + this.valContractState.getActiveValidators(), + 'nodeId' + ) + const check1 = MessageBlockUtil.checkBlock(mb, activeValidators) if (!check1.success) { - this.log.error('item validation failed: ', check1.err); - return false; + this.log.error('item validation failed: ', check1.err) + return false } // check database - let shardSet = MessageBlockUtil.calculateAffectedShards(mb, this.storageContractState.shardCount); - let isNew = await this.blockStorage.saveBlockWithShardData(mb, calculatedHash, shardSet); + const shardSet = MessageBlockUtil.calculateAffectedShards( + mb, + this.storageContractState.shardCount + ) + const isNew = await this.blockStorage.saveBlockWithShardData(mb, calculatedHash, shardSet) if (!isNew) { // this is not an error, because we read duplicates from every validator - this.log.debug('block %s already exists ', mb.id); - return false; + this.log.debug('block %s already exists ', mb.id) + return false } // send block - await this.indexStorage.unpackBlockToInboxes(mb, shardSet); + await this.indexStorage.unpackBlockToInboxes(mb, shardSet) } - - public async handleReshard(currentNodeShards: Set|null, allNodeShards: Map>) { - let newShards = currentNodeShards ?? new Set(); - const oldShards = await this.blockStorage.loadNodeShards(); - this.log.debug('handleReshard(): newShards: %j oldShards: %j', - Coll.setToArray(newShards), Coll.setToArray(oldShards)) + public async handleReshard( + currentNodeShards: Set | null, + allNodeShards: Map> + ) { + const newShards = currentNodeShards ?? new Set() + const oldShards = await this.blockStorage.loadNodeShards() + this.log.debug( + 'handleReshard(): newShards: %j oldShards: %j', + Coll.setToArray(newShards), + Coll.setToArray(oldShards) + ) if (Coll.isEqualSet(newShards, oldShards)) { this.log.debug('handleReshard(): no reshard is needed') - return; + return } - let shardsToAdd = Coll.substractSet(newShards, oldShards); - let shardsToDelete = Coll.substractSet(oldShards, newShards); - let commonShards = Coll.intersectSet(oldShards, newShards); - this.log.debug('shardsToAdd %j shardsToDelete %j shardsRemaining %j', + const shardsToAdd = Coll.substractSet(newShards, oldShards) + const shardsToDelete = Coll.substractSet(oldShards, newShards) + const commonShards = Coll.intersectSet(oldShards, newShards) + this.log.debug( + 'shardsToAdd %j shardsToDelete %j shardsRemaining %j', Coll.setToArray(shardsToAdd), Coll.setToArray(shardsToDelete), - Coll.setToArray(commonShards)); + Coll.setToArray(commonShards) + ) - await this.indexStorage.deleteShardsFromInboxes(shardsToDelete); + await this.indexStorage.deleteShardsFromInboxes(shardsToDelete) // add to index // reprocess every block from blocks table (only once per each block) // if the block has shardsToAdd -> add anything which is in shardsToAdd - const pageSize = 30; - await this.blockStorage.iterateAllStoredBlocks(this.storageContractState.shardCount, + const pageSize = 30 + await this.blockStorage.iterateAllStoredBlocks( + this.storageContractState.shardCount, pageSize, shardsToAdd, async (messageBlockJson, messageBlockHash, messageBlockShards) => { - let mb: MessageBlock = JSON.parse(messageBlockJson); - let shardsToAddFromBlock = Coll.intersectSet(shardsToAdd, messageBlockShards); - this.log.debug('reindexing block %s, blockShards %s, shardsToAdd %s,, shardsToAddFromBlock', - messageBlockHash, Coll.setToArray(messageBlockShards), Coll.setToArray(shardsToAdd), Coll.setToArray(shardsToAddFromBlock)) - await this.indexStorage.unpackBlockToInboxes(mb, shardsToAddFromBlock); - }); - await this.blockStorage.saveNodeShards(newShards); + const mb: MessageBlock = JSON.parse(messageBlockJson) + const shardsToAddFromBlock = Coll.intersectSet(shardsToAdd, messageBlockShards) + this.log.debug( + 'reindexing block %s, blockShards %s, shardsToAdd %s,, shardsToAddFromBlock', + messageBlockHash, + Coll.setToArray(messageBlockShards), + Coll.setToArray(shardsToAdd), + Coll.setToArray(shardsToAddFromBlock) + ) + await this.indexStorage.unpackBlockToInboxes(mb, shardsToAddFromBlock) + } + ) + await this.blockStorage.saveNodeShards(newShards) } -} \ No newline at end of file +} diff --git a/src/utilz/bitUtil.ts b/src/utilz/bitUtil.ts index 22c8115..d7a9e1f 100644 --- a/src/utilz/bitUtil.ts +++ b/src/utilz/bitUtil.ts @@ -1,4 +1,4 @@ -import {Coll} from "./coll"; +import { Coll } from './coll' export class BitUtil { /** @@ -24,8 +24,8 @@ export class BitUtil { target = new Buffer(add.length) src.copy(target, 0, 0, src.length) } - var length = Math.min(target.length, add.length) - for (var i = 0; i < length; ++i) { + const length = Math.min(target.length, add.length) + for (let i = 0; i < length; ++i) { target[i] = target[i] ^ add[i] } return target @@ -39,22 +39,22 @@ export class BitUtil { return Buffer.from(value, 'base64').toString('utf8') } - public static getBit(number:number, bitOffset:number) { - return (number & (1 << bitOffset)) === 0 ? 0 : 1; + public static getBit(number: number, bitOffset: number) { + return (number & (1 << bitOffset)) === 0 ? 0 : 1 } public static bitsToPositions(number: number): number[] { // return null; - const result: number[] = []; - let position = 0; + const result: number[] = [] + let position = 0 while (number !== 0) { if ((number & 1) === 1) { - result.push(position); + result.push(position) } - number = number >>> 1; - position++; + number = number >>> 1 + position++ } - Coll.sortNumbersAsc(result); - return result; + Coll.sortNumbersAsc(result) + return result } } diff --git a/src/utilz/coll.ts b/src/utilz/coll.ts index b700d2d..4528ff8 100644 --- a/src/utilz/coll.ts +++ b/src/utilz/coll.ts @@ -1,7 +1,6 @@ // CollectionUtils // all the proper type safe way to work with JS collections/sets/arrays export class Coll { - public static arrayToMap(arr: V[], keyField: K): Map { if (arr == null || arr.length == 0) { return new Map() @@ -13,20 +12,19 @@ export class Coll { if (map == null || map.size == 0) { return [] } - return [...map.values()]; + return [...map.values()] } public static mapKeysToArray(map: Map): K[] { if (map == null || map.size == 0) { return [] } - return [...map.keys()]; + return [...map.keys()] } - public static arrayToSet(arr: V[]): Set { if (arr == null) { - return new Set(); + return new Set() } return new Set(arr) } @@ -45,59 +43,65 @@ export class Coll { public static setToArray(set: Set): V[] { if (set == null) { - return []; + return [] } return Array.from(set.keys()) } // [1,2,3] - [2,3] = [1] public static substractSet(set1: Set, set2: Set): Set { - return new Set([...set1].filter(x => !set2.has(x))); + return new Set([...set1].filter((x) => !set2.has(x))) } // [1,2,3] x [2, 3] = [2,3] public static intersectSet(set1: Set, set2: Set): Set { - return new Set([...set1].filter(x => set2.has(x))); + return new Set([...set1].filter((x) => set2.has(x))) } // [1,2,3] x [2, 3] = [2,3] public static addSet(set1: Set, set2: Set): Set { - return new Set([...set1, ...set2]); + return new Set([...set1, ...set2]) } public static sortNumbersAsc(array: number[]) { if (array == null || array.length == 0) { - return; + return } array.sort((a, b) => { - return a - b; + return a - b }) } public static isEqualSet(a: Set, b: Set) { - if (a === b) return true; - if (a.size !== b.size) return false; + if (a === b) return true + if (a.size !== b.size) return false for (const value of a) { if (!b.has(value)) { - return false; + return false } } - return true; + return true } // parse '[1,2,3]' into Set: 1,2,3 public static parseAsNumberSet(jsonArray: string): Set { - const arr: number[] = JSON.parse(jsonArray); - return Coll.arrayToSet(arr); + const arr: number[] = JSON.parse(jsonArray) + return Coll.arrayToSet(arr) } // store set 1,2,3 as array: [1,2,3] public static numberSetToJson(s: Set): string { - return JSON.stringify([...s]); + return JSON.stringify([...s]) } // set 1,2,3 to sql: ('1','2','3') public static numberSetToSqlQuoted(s: Set): string { - return '(' + Coll.setToArray(s).map(num => "'" + num + "'").join(',') + ')'; + return ( + '(' + + Coll.setToArray(s) + .map((num) => "'" + num + "'") + .join(',') + + ')' + ) } } diff --git a/src/utilz/dateUtil.ts b/src/utilz/dateUtil.ts index 4840b49..a20646d 100644 --- a/src/utilz/dateUtil.ts +++ b/src/utilz/dateUtil.ts @@ -43,7 +43,7 @@ export default class DateUtil { public static currentTimeSeconds(): number { // new Date().getTime() - return Math.round( Date.now()/ 1000) + return Math.round(Date.now() / 1000) } public static millisToDate(timestamp: number): Date { diff --git a/src/utilz/envLoader.ts b/src/utilz/envLoader.ts index 939fa82..9250fff 100644 --- a/src/utilz/envLoader.ts +++ b/src/utilz/envLoader.ts @@ -1,17 +1,17 @@ import dotenv from 'dotenv' + import StrUtil from './strUtil' export class EnvLoader { - public static loadEnvOrFail() { // loads all .env variables into process.env.* variables // Optional support for CONFIG_DIR variable - console.log(`config dir is ${process.env.CONFIG_DIR}`); - let options = {}; + console.log(`config dir is ${process.env.CONFIG_DIR}`) + let options = {} if (process.env.CONFIG_DIR) { - options = {path: `${process.env.CONFIG_DIR}/.env`}; + options = { path: `${process.env.CONFIG_DIR}/.env` } } - const envFound = dotenv.config(options); + const envFound = dotenv.config(options) if (envFound.error) { throw new Error("⚠️ Couldn't find .env file ⚠️") } @@ -31,10 +31,10 @@ export class EnvLoader { return val != null && val.toLowerCase() === 'true' } - public static getPropertyOrDefault(propName: string, def:string): string { + public static getPropertyOrDefault(propName: string, def: string): string { const val = process.env[propName] if (StrUtil.isEmpty(val)) { - return def; + return def } return val } diff --git a/src/utilz/ethSig.ts b/src/utilz/ethSig.ts index be11cd9..e1b06aa 100644 --- a/src/utilz/ethSig.ts +++ b/src/utilz/ethSig.ts @@ -1,6 +1,7 @@ -import {Wallet} from "ethers"; -import {verifyMessage} from "ethers/lib/utils"; -import {ObjectHasher} from "./objectHasher"; +import { Wallet } from 'ethers' +import { verifyMessage } from 'ethers/lib/utils' + +import { ObjectHasher } from './objectHasher' /** * Utitily class that allows @@ -10,26 +11,24 @@ import {ObjectHasher} from "./objectHasher"; * Ignores 'signature' properties */ export class EthSig { + public static async create(wallet: Wallet, ...object: any[]): Promise { + const ethMessage = ObjectHasher.hashToSha256IgnoreSig(object) + const sig = await wallet.signMessage(ethMessage) + return sig + } - public static async create(wallet: Wallet, ...object: any[]): Promise { - const ethMessage = ObjectHasher.hashToSha256IgnoreSig(object); - const sig = await wallet.signMessage(ethMessage); - return sig; - } - - public static check(sig: string, targetWallet: string, ...object: any[]): boolean { - const ethMessage = ObjectHasher.hashToSha256IgnoreSig(object); - const verificationAddress = verifyMessage(ethMessage, sig); - if (targetWallet !== verificationAddress) { - return false; - } - return true; + public static check(sig: string, targetWallet: string, ...object: any[]): boolean { + const ethMessage = ObjectHasher.hashToSha256IgnoreSig(object) + const verificationAddress = verifyMessage(ethMessage, sig) + if (targetWallet !== verificationAddress) { + return false } + return true + } - public static isEthZero(addr: string) { - return '0x0000000000000000000000000000000000000000' === addr - } + public static isEthZero(addr: string) { + return '0x0000000000000000000000000000000000000000' === addr + } } -export function Signed(target: Function) { -} \ No newline at end of file +export function Signed(target: Function) {} diff --git a/src/utilz/ethersUtil.ts b/src/utilz/ethersUtil.ts index 6f0613e..96cfd14 100644 --- a/src/utilz/ethersUtil.ts +++ b/src/utilz/ethersUtil.ts @@ -1,61 +1,68 @@ -import path from "path"; -import fs from "fs"; -import {EnvLoader} from "./envLoader"; -import {Contract, ethers, Wallet} from "ethers"; -import {JsonRpcProvider} from "@ethersproject/providers/src.ts/json-rpc-provider"; -import {Logger} from "winston"; -import {ValidatorCtClient} from "../services/messaging-common/validatorContractState"; -import {WinstonUtil} from "./winstonUtil"; +import { JsonRpcProvider } from '@ethersproject/providers/src.ts/json-rpc-provider' +import { Contract, ethers, Wallet } from 'ethers' +import fs from 'fs' +import path from 'path' + +import { WinstonUtil } from './winstonUtil' export class EthersUtil { - static log = WinstonUtil.newLog(EthersUtil); + static log = WinstonUtil.newLog(EthersUtil) public static loadAbi(configDir: string, fileNameInConfigDir: string): string { - const fileAbsolute = path.resolve(configDir, `./${fileNameInConfigDir}`); - const file = fs.readFileSync(fileAbsolute, 'utf8'); - const json = JSON.parse(file); - const abi = json.abi; - console.log(`abi size:`, abi.length); - return abi; + const fileAbsolute = path.resolve(configDir, `./${fileNameInConfigDir}`) + const file = fs.readFileSync(fileAbsolute, 'utf8') + const json = JSON.parse(file) + const abi = json.abi + console.log(`abi size:`, abi.length) + return abi } // creates a client, using an encrypted private key from disk, so that we could sign/write to the blockchain - public static async connectWithKey(configDir: string, - privateKeyFileName: string, - privateKeyPass: string, - contractAbiFileName: string, - contractAddr: string, - provider: JsonRpcProvider): Promise { - let abi = EthersUtil.loadAbi(configDir, contractAbiFileName); - const jsonFile = fs.readFileSync(configDir + '/' + privateKeyFileName, 'utf-8'); - let nodeWallet = await Wallet.fromEncryptedJson(jsonFile, privateKeyPass); - let nodeAddress = await nodeWallet.getAddress(); + public static async connectWithKey( + configDir: string, + privateKeyFileName: string, + privateKeyPass: string, + contractAbiFileName: string, + contractAddr: string, + provider: JsonRpcProvider + ): Promise { + const abi = EthersUtil.loadAbi(configDir, contractAbiFileName) + const jsonFile = fs.readFileSync(configDir + '/' + privateKeyFileName, 'utf-8') + const nodeWallet = await Wallet.fromEncryptedJson(jsonFile, privateKeyPass) + const nodeAddress = await nodeWallet.getAddress() const signer = nodeWallet.connect(provider) - const contract = new ethers.Contract(contractAddr, abi, signer); - this.log.debug('connecting contract %s using signer %s (keydir: %s, keyfile: %s, abi: %s) ', - contractAddr, signer.address, configDir, privateKeyFileName, contractAbiFileName); + const contract = new ethers.Contract(contractAddr, abi, signer) + this.log.debug( + 'connecting contract %s using signer %s (keydir: %s, keyfile: %s, abi: %s) ', + contractAddr, + signer.address, + configDir, + privateKeyFileName, + contractAbiFileName + ) return { contract, nodeWallet, nodeAddress - }; + } } // creates a client which can only read blockchain state - public static async connectWithoutKey(configDir: string, - contractAbiFileName: string, - contractAddr: string, - provider: JsonRpcProvider): Promise { - let abi = EthersUtil.loadAbi(configDir, contractAbiFileName); - const contract = new ethers.Contract(contractAddr, abi, provider); - this.log.debug('connecting contract %s (no key, abi: %s) ', - contractAddr, contractAbiFileName); - return contract; + public static async connectWithoutKey( + configDir: string, + contractAbiFileName: string, + contractAddr: string, + provider: JsonRpcProvider + ): Promise { + const abi = EthersUtil.loadAbi(configDir, contractAbiFileName) + const contract = new ethers.Contract(contractAddr, abi, provider) + this.log.debug('connecting contract %s (no key, abi: %s) ', contractAddr, contractAbiFileName) + return contract } } type ContractWithMeta = { - contract: Contract; - nodeWallet: Wallet; - nodeAddress: string; -} \ No newline at end of file + contract: Contract + nodeWallet: Wallet + nodeAddress: string +} diff --git a/src/utilz/expressUtil.ts b/src/utilz/expressUtil.ts index dd8e671..31badaa 100644 --- a/src/utilz/expressUtil.ts +++ b/src/utilz/expressUtil.ts @@ -1,4 +1,5 @@ import { NextFunction, Request, Response } from 'express' + import { WinstonUtil } from './winstonUtil' export class ExpressUtil { diff --git a/src/utilz/idUtil.ts b/src/utilz/idUtil.ts index 17f32cb..4d742f1 100755 --- a/src/utilz/idUtil.ts +++ b/src/utilz/idUtil.ts @@ -1,8 +1,7 @@ -import * as uuid from "uuid"; +import * as uuid from 'uuid' export default class IdUtil { - - public static getUuidV4(): string { - return uuid.v4(); - } -} \ No newline at end of file + public static getUuidV4(): string { + return uuid.v4() + } +} diff --git a/src/utilz/mySqlUtil.ts b/src/utilz/mySqlUtil.ts index 30621b2..bb9c7b5 100644 --- a/src/utilz/mySqlUtil.ts +++ b/src/utilz/mySqlUtil.ts @@ -1,7 +1,8 @@ -import { Logger } from 'winston' import { OkPacket, Pool } from 'mysql' -import { WinstonUtil } from './winstonUtil' +import { Logger } from 'winston' + import { EnvLoader } from './envLoader' +import { WinstonUtil } from './winstonUtil' /* A sync replacement of db.query callback-y code diff --git a/src/utilz/numUtil.ts b/src/utilz/numUtil.ts index 1331ad6..7961ca1 100644 --- a/src/utilz/numUtil.ts +++ b/src/utilz/numUtil.ts @@ -11,7 +11,7 @@ export class NumUtil { } static isRoundedInteger(valN: number) { - return Number.isInteger(valN); + return Number.isInteger(valN) } public static toString(value: number) { diff --git a/src/utilz/pgUtil.ts b/src/utilz/pgUtil.ts index eeab171..87fd217 100644 --- a/src/utilz/pgUtil.ts +++ b/src/utilz/pgUtil.ts @@ -1,10 +1,10 @@ -import {Logger} from 'winston' -import {OkPacket, Pool} from 'mysql' -import {WinstonUtil} from './winstonUtil' -import {EnvLoader} from './envLoader' -import StrUtil from "./strUtil"; -import pg from "pg-promise/typescript/pg-subset"; -import {IDatabase} from "pg-promise"; +import { IDatabase } from 'pg-promise' +import pg from 'pg-promise/typescript/pg-subset' +import { Logger } from 'winston' + +import { EnvLoader } from './envLoader' +import StrUtil from './strUtil' +import { WinstonUtil } from './winstonUtil' // PG PROMISE https://github.com/vitaly-t/pg-promise @@ -13,10 +13,10 @@ import {IDatabase} from "pg-promise"; export class PgUtil { private static log: Logger = WinstonUtil.newLog('pg') static logSql = false - static pool:IDatabase<{}, pg.IClient>; // todo unknown type ??? + static pool: IDatabase<{}, pg.IClient> // todo unknown type ??? - public static init(pool:IDatabase<{}, pg.IClient>) { - PgUtil.pool = pool; + public static init(pool: IDatabase<{}, pg.IClient>) { + PgUtil.pool = pool if (!PgUtil.logSql && EnvLoader.getPropertyAsBool('LOG_SQL_STATEMENTS')) { // todo add logging query + values PgUtil.logSql = true @@ -61,26 +61,25 @@ export class PgUtil { } public static async update(query: string, ...sqlArgs: any[]): Promise { - query = StrUtil.replaceAllMySqlToPostre(query); - this.log.debug(query, ' ---> args ', sqlArgs); - let result = await this.pool.result(query, sqlArgs,r => r.rowCount); - return result; + query = StrUtil.replaceAllMySqlToPostre(query) + this.log.debug(query, ' ---> args ', sqlArgs) + const result = await this.pool.result(query, sqlArgs, (r) => r.rowCount) + return result } public static async insert(query: string, ...sqlArgs: any[]): Promise { - query = StrUtil.replaceAllMySqlToPostre(query); - this.log.debug(query, ' ---> args ', sqlArgs); - let result = await this.pool.result(query, sqlArgs,r => r.rowCount); - return result; + query = StrUtil.replaceAllMySqlToPostre(query) + this.log.debug(query, ' ---> args ', sqlArgs) + const result = await this.pool.result(query, sqlArgs, (r) => r.rowCount) + return result } public static async queryArr(query: string, ...sqlArgs: any[]): Promise { - query = StrUtil.replaceAllMySqlToPostre(query); - this.log.debug(query, ' ---> args ', sqlArgs); - let result = await this.pool.query(query, sqlArgs); - return result; + query = StrUtil.replaceAllMySqlToPostre(query) + this.log.debug(query, ' ---> args ', sqlArgs) + const result = await this.pool.query(query, sqlArgs) + return result } - } /* diff --git a/src/utilz/promiseUtil.ts b/src/utilz/promiseUtil.ts index a133a64..855bdb1 100644 --- a/src/utilz/promiseUtil.ts +++ b/src/utilz/promiseUtil.ts @@ -1,92 +1,90 @@ export class PromiseUtil { - - // Waits for all promises to complete - public static allSettled(promises: Promise[]): Promise[]> { - let wrappedPromises = promises.map(p => { - return Promise.resolve(p) - .then( - val => new PromiseResult(PromiseResultType.SUCCESS, val, null), - err => new PromiseResult(PromiseResultType.FAILED, null, err)); - }); - return Promise.all(wrappedPromises); - } - - public static async sleep(ms) { - return new Promise(resolve => setTimeout(resolve, ms)); + // Waits for all promises to complete + public static allSettled(promises: Promise[]): Promise[]> { + const wrappedPromises = promises.map((p) => { + return Promise.resolve(p).then( + (val) => new PromiseResult(PromiseResultType.SUCCESS, val, null), + (err) => new PromiseResult(PromiseResultType.FAILED, null, err) + ) + }) + return Promise.all(wrappedPromises) + } + + public static async sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)) + } + + public static createDeferred( + rejectTimeout: number = 0, + resolveTimeout: number = 0 + ): DeferredPromise { + const deferred = new DeferredPromise() + deferred.promise = new Promise((resolve, reject) => { + deferred.resolve = resolve + deferred.reject = reject + }) + if (rejectTimeout > 0) { + setTimeout(function () { + deferred.reject() + }, rejectTimeout) } - - public static createDeferred(rejectTimeout: number = 0, - resolveTimeout: number = 0): DeferredPromise { - let deferred = new DeferredPromise(); - deferred.promise = new Promise((resolve, reject) => { - deferred.resolve = resolve; - deferred.reject = reject; - }); - if (rejectTimeout > 0) { - setTimeout(function () { - deferred.reject(); - }, rejectTimeout); - } - if (resolveTimeout > 0) { - setTimeout(function () { - deferred.resolve(); - }, resolveTimeout); - } - return deferred; + if (resolveTimeout > 0) { + setTimeout(function () { + deferred.resolve() + }, resolveTimeout) } - - + return deferred + } } export enum PromiseResultType { - FAILED = -1, - RUNNING = 0, - SUCCESS = 1 + FAILED = -1, + RUNNING = 0, + SUCCESS = 1 } export class PromiseResult { - private _status: PromiseResultType = PromiseResultType.RUNNING; - private _val: T; - private _err: any; - - constructor(status: number, val: T, err: any) { - this._status = status; - this._val = val; - this._err = err; - } - - public isFullfilled(): boolean { - return this._status == PromiseResultType.SUCCESS; - } - - public isSuccess(): boolean { - return this._status == PromiseResultType.SUCCESS; - } - - public isRejected(): boolean { - return this._status == PromiseResultType.FAILED; - } - - public isRunning(): boolean { - return this._status == PromiseResultType.RUNNING; - } - - get status(): PromiseResultType { - return this._status; - } - - get val(): T { - return this._val; - } - - get err(): any { - return this._err; - } + private _status: PromiseResultType = PromiseResultType.RUNNING + private _val: T + private _err: any + + constructor(status: number, val: T, err: any) { + this._status = status + this._val = val + this._err = err + } + + public isFullfilled(): boolean { + return this._status == PromiseResultType.SUCCESS + } + + public isSuccess(): boolean { + return this._status == PromiseResultType.SUCCESS + } + + public isRejected(): boolean { + return this._status == PromiseResultType.FAILED + } + + public isRunning(): boolean { + return this._status == PromiseResultType.RUNNING + } + + get status(): PromiseResultType { + return this._status + } + + get val(): T { + return this._val + } + + get err(): any { + return this._err + } } export class DeferredPromise { - promise: Promise; - resolve: Function; - reject: Function; + promise: Promise + resolve: Function + reject: Function } - diff --git a/src/utilz/strUtil.ts b/src/utilz/strUtil.ts index b538954..814ca3e 100755 --- a/src/utilz/strUtil.ts +++ b/src/utilz/strUtil.ts @@ -1,22 +1,21 @@ export default class StrUtil { - public static isEmpty(s: string): boolean { if (s == null) { - return true; + return true } if (typeof s !== 'string') { - return false; + return false } return s.length === 0 } public static isHex(s: string): boolean { if (StrUtil.isEmpty(s)) { - return false; + return false } - let pattern = /^[A-F0-9]+$/i; - let result = pattern.test(s); - return result; + const pattern = /^[A-F0-9]+$/i + const result = pattern.test(s) + return result } /** @@ -25,22 +24,25 @@ export default class StrUtil { * @param defaultValue */ public static getOrDefault(s: string, defaultValue: string) { - return StrUtil.isEmpty(s) ? defaultValue : s; + return StrUtil.isEmpty(s) ? defaultValue : s } public static toStringDeep(obj: any): string { - return JSON.stringify(obj, null, 4); + return JSON.stringify(obj, null, 4) } // https://ethereum.stackexchange.com/questions/2045/is-ethereum-wallet-address-case-sensitive public static normalizeEthAddress(addr: string): string { - return addr; + return addr } - public static replaceAll(str: string, - find: string[], replace: string[], - regexFlags: string):string { - var gFlag = false + public static replaceAll( + str: string, + find: string[], + replace: string[], + regexFlags: string + ): string { + let gFlag = false if (typeof str !== 'string') { throw new TypeError('`str` parameter must be a string!') @@ -74,12 +76,12 @@ export default class StrUtil { regexFlags = 'g' } - var done = [] - var joined = find.join(')|(') - var regex = new RegExp('(' + joined + ')', regexFlags) + const done = [] + const joined = find.join(')|(') + const regex = new RegExp('(' + joined + ')', regexFlags) return str.replace(regex, (match, ...finds) => { - var replaced + let replaced finds.some((found, index) => { if (found !== undefined) { @@ -106,9 +108,9 @@ export default class StrUtil { * aaaa?bbbb?cccc? => aaaa$1bbbb$2cccc$3 */ public static replaceAllMySqlToPostre(s: string): string { - let cnt = 1; + let cnt = 1 return s.replace(/\?/g, function () { - return `$${cnt++}`; - }); + return `$${cnt++}` + }) } } diff --git a/src/utilz/tuple.ts b/src/utilz/tuple.ts index b0f12b8..be99886 100644 --- a/src/utilz/tuple.ts +++ b/src/utilz/tuple.ts @@ -1 +1 @@ -type Tuple = [A, undefined?] | [undefined, B]; \ No newline at end of file +type Tuple = [A, undefined?] | [undefined, B] diff --git a/src/utilz/waitNotify.ts b/src/utilz/waitNotify.ts index d652d53..1840fac 100644 --- a/src/utilz/waitNotify.ts +++ b/src/utilz/waitNotify.ts @@ -1,50 +1,50 @@ -import {EventEmitter} from "events"; +import { EventEmitter } from 'events' -import IdUtil from "./idUtil"; +import IdUtil from './idUtil' export class WaitNotify { - ee: EventEmitter; - waitMsgIDs = []; + ee: EventEmitter + waitMsgIDs = [] - constructor() { - this.ee = new EventEmitter(); - this.waitMsgIDs = []; - } + constructor() { + this.ee = new EventEmitter() + this.waitMsgIDs = [] + } - async wait(timeout:number = 0) { - return new Promise((resolve, reject) => { - const msgID = IdUtil.getUuidV4(); - this.waitMsgIDs.push(msgID); - let timeoutId; - this.ee.once(msgID, () => { - if (timeoutId) { - clearTimeout(timeoutId); - } - resolve(); - }); - if (timeout) { - timeoutId = setTimeout(() => { - const delIndex = this.waitMsgIDs.indexOf(msgID); - if (delIndex !== -1) { - this.waitMsgIDs.splice(delIndex, 1); - reject(new Error('wait timeout')); - } - }, timeout); - } - }); - } + async wait(timeout: number = 0) { + return new Promise((resolve, reject) => { + const msgID = IdUtil.getUuidV4() + this.waitMsgIDs.push(msgID) + let timeoutId + this.ee.once(msgID, () => { + if (timeoutId) { + clearTimeout(timeoutId) + } + resolve() + }) + if (timeout) { + timeoutId = setTimeout(() => { + const delIndex = this.waitMsgIDs.indexOf(msgID) + if (delIndex !== -1) { + this.waitMsgIDs.splice(delIndex, 1) + reject(new Error('wait timeout')) + } + }, timeout) + } + }) + } - notify() { - this.notifyAll(); - } + notify() { + this.notifyAll() + } - notifyAll() { - while (this.waitMsgIDs.length > 0) { - this.ee.emit(this.waitMsgIDs.shift()); - } + notifyAll() { + while (this.waitMsgIDs.length > 0) { + this.ee.emit(this.waitMsgIDs.shift()) } + } - notifyOne() { - this.ee.emit(this.waitMsgIDs.shift()); - } -} \ No newline at end of file + notifyOne() { + this.ee.emit(this.waitMsgIDs.shift()) + } +} diff --git a/src/utilz/winstonUtil.ts b/src/utilz/winstonUtil.ts index 3850205..7286f57 100644 --- a/src/utilz/winstonUtil.ts +++ b/src/utilz/winstonUtil.ts @@ -1,8 +1,9 @@ -import {Format, TransformableInfo} from 'logform' -import {DateTime} from 'ts-luxon' +import { Format, TransformableInfo } from 'logform' +import { DateTime } from 'ts-luxon' import winston from 'winston' + +import { EnvLoader } from './envLoader' import StrUtil from './strUtil' -import {EnvLoader} from './envLoader' /* Example usage: @@ -52,17 +53,16 @@ I 230811 174624 [MyClass] Got alias List (SendMessage) */ - export class WinstonUtil { private static readonly CLASS_NAME_LENGTH = 23 - private static readonly LOG_DIR = EnvLoader.getPropertyOrFail('LOG_DIR'); - private static readonly LOG_LEVEL = EnvLoader.getPropertyOrFail('LOG_LEVEL'); - private static loggerMap: Map = new Map(); + private static readonly LOG_DIR = EnvLoader.getPropertyOrFail('LOG_DIR') + private static readonly LOG_LEVEL = EnvLoader.getPropertyOrFail('LOG_LEVEL') + private static loggerMap: Map = new Map() // all console writes drop here public static consoleTransport = new winston.transports.Console({ format: WinstonUtil.createFormat2WhichRendersClassName() - }); + }) // add debug writes drop here @@ -93,7 +93,7 @@ export class WinstonUtil { { "message": "Checking Node Version", "level": "info", "timestamp": "230809 180338", className?: "myClass"} */ public static renderFormat2(info: TransformableInfo) { - const {timestamp, level, message, meta} = info + const { timestamp, level, message, meta } = info const levelFirstChar = level == null ? '' : level.toUpperCase()[0] const date = DateTime.now() const formattedDate = date.toFormat('yyMMdd HHmmss') @@ -115,7 +115,7 @@ export class WinstonUtil { winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), - winston.format.errors({stack: true}), + winston.format.errors({ stack: true }), winston.format.splat(), winston.format.json(), winston.format.printf((info) => { @@ -127,7 +127,7 @@ export class WinstonUtil { static createFormat2WhichRendersClassName(): Format { return winston.format.combine( - winston.format.errors({stack: true}), + winston.format.errors({ stack: true }), winston.format.splat(), winston.format.printf((info) => { return WinstonUtil.renderFormat2(info) @@ -156,7 +156,7 @@ export class WinstonUtil { WinstonUtil.debugFileTransport, WinstonUtil.errorFileTransport ] - }); + }) WinstonUtil.loggerMap.set(loggerName, loggerObj) return loggerObj } diff --git a/tests/DbHelper.test.ts b/tests/DbHelper.test.ts index 81c26c4..03dd041 100755 --- a/tests/DbHelper.test.ts +++ b/tests/DbHelper.test.ts @@ -1,27 +1,30 @@ -import chai from 'chai' import 'mocha' + +import chai from 'chai' + import DbHelper from '../src/helpers/dbHelper' -import {Container} from "node-docker-api/lib/container"; -import {filter} from "lodash"; chai.should() -const crypto = require("crypto"); +const crypto = require('crypto') const expect = chai.expect describe('DbHelpers tests', function () { - it('test 8 bit for 1 key ', function () { - let shardId = DbHelper.calculateShardForNamespaceIndex("feeds", "store0001"); - expect(shardId).lessThanOrEqual(255); - expect(shardId).greaterThanOrEqual(0) - console.log(shardId); - }) + it('test 8 bit for 1 key ', function () { + const shardId = DbHelper.calculateShardForNamespaceIndex('feeds', 'store0001') + expect(shardId).lessThanOrEqual(255) + expect(shardId).greaterThanOrEqual(0) + console.log(shardId) + }) - it('test 8 bit for 10 random keys', function () { - for (let i = 0; i < 10; i++) { - let shardId = DbHelper.calculateShardForNamespaceIndex(crypto.randomBytes(20).toString('hex'), crypto.randomBytes(20).toString('hex')); - expect(shardId).lessThanOrEqual(255); - expect(shardId).greaterThanOrEqual(0) - console.log(shardId); - } - }) + it('test 8 bit for 10 random keys', function () { + for (let i = 0; i < 10; i++) { + const shardId = DbHelper.calculateShardForNamespaceIndex( + crypto.randomBytes(20).toString('hex'), + crypto.randomBytes(20).toString('hex') + ) + expect(shardId).lessThanOrEqual(255) + expect(shardId).greaterThanOrEqual(0) + console.log(shardId) + } + }) }) diff --git a/tests/RandomUtil.test.ts b/tests/RandomUtil.test.ts index 17e1cd1..b3d5249 100755 --- a/tests/RandomUtil.test.ts +++ b/tests/RandomUtil.test.ts @@ -1,26 +1,25 @@ -import chai from 'chai' import 'mocha' -import DbHelper from '../src/helpers/dbHelper' -import { RandomUtil } from "dstorage-common"; + +import chai from 'chai' +import { RandomUtil } from 'dstorage-common' chai.should() -const crypto = require("crypto"); +const crypto = require('crypto') const expect = chai.expect describe('RandomUtil tests', function () { - it('random-1', function () { - console.log(RandomUtil.getRandomSubArray([1,2,3,4,5], 1)); - console.log(RandomUtil.getRandomSubArray([1,2,3,4,5], 2)); - console.log(RandomUtil.getRandomSubArray([1,2,3,4,5], 3)); - console.log(RandomUtil.getRandomSubArray([1,2,3,4,5], 4)); - console.log(RandomUtil.getRandomSubArray([1,2,3,4,5], 5)); - let failed = false; - try { - console.log(RandomUtil.getRandomSubArray([1, 2, 3, 4, 5], 6)); - } catch (e) { - failed = true; - } - if(!failed) expect(failed).equals(true); - }) - + it('random-1', function () { + console.log(RandomUtil.getRandomSubArray([1, 2, 3, 4, 5], 1)) + console.log(RandomUtil.getRandomSubArray([1, 2, 3, 4, 5], 2)) + console.log(RandomUtil.getRandomSubArray([1, 2, 3, 4, 5], 3)) + console.log(RandomUtil.getRandomSubArray([1, 2, 3, 4, 5], 4)) + console.log(RandomUtil.getRandomSubArray([1, 2, 3, 4, 5], 5)) + let failed = false + try { + console.log(RandomUtil.getRandomSubArray([1, 2, 3, 4, 5], 6)) + } catch (e) { + failed = true + } + if (!failed) expect(failed).equals(true) + }) }) diff --git a/tests/SNodeIntegration.test.ts b/tests/SNodeIntegration.test.ts index 581a490..fc2b838 100755 --- a/tests/SNodeIntegration.test.ts +++ b/tests/SNodeIntegration.test.ts @@ -1,241 +1,287 @@ +import { AxiosResponse } from 'axios' import chai from 'chai' -import {RandomUtil, PromiseUtil, EnvLoader} from 'dstorage-common' -import pgPromise, {IDatabase} from 'pg-promise'; -import {AxiosResponse} from "axios"; +import { EnvLoader, PromiseUtil, RandomUtil } from 'dstorage-common' +import pgPromise, { IDatabase } from 'pg-promise' // import crypto from 'crypto' -const crypto = require("crypto"); -var _ = require('lodash'); +const crypto = require('crypto') +const _ = require('lodash') const expect = chai.expect -import assert from 'assert-ts'; -const axios = require('axios'); -import {DateTime} from "ts-luxon"; -import {DateUtil} from "dstorage-common"; -import DbHelper from "../src/helpers/dbHelper"; +const axios = require('axios') +import { DateUtil } from 'dstorage-common' +import { DateTime } from 'ts-luxon' +import DbHelper from '../src/helpers/dbHelper' -EnvLoader.loadEnvOrFail(); - +EnvLoader.loadEnvOrFail() class SNode1Constants { - // DATA GENERATION/POST CHECKS POINTS TO S1 DATABASE - static dbUri = `postgres://${EnvLoader.getPropertyOrFail('TEST_SNODE_DB_USER')}:${EnvLoader.getPropertyOrFail('TEST_SNODE_DB_PASS')}@${EnvLoader.getPropertyOrFail('TEST_SNODE_DB_HOST')}:5432/${EnvLoader.getPropertyOrFail('TEST_SNODE_DB_NAME')}`; - // API TESTS POINT TO S1 API - static apiUrl = EnvLoader.getPropertyOrFail('TEST_SNODE_API_URL'); - static namespace = 'feeds'; + // DATA GENERATION/POST CHECKS POINTS TO S1 DATABASE + static dbUri = `postgres://${EnvLoader.getPropertyOrFail('TEST_SNODE_DB_USER')}:${EnvLoader.getPropertyOrFail('TEST_SNODE_DB_PASS')}@${EnvLoader.getPropertyOrFail('TEST_SNODE_DB_HOST')}:5432/${EnvLoader.getPropertyOrFail('TEST_SNODE_DB_NAME')}` + // API TESTS POINT TO S1 API + static apiUrl = EnvLoader.getPropertyOrFail('TEST_SNODE_API_URL') + static namespace = 'feeds' } -console.log(SNode1Constants.dbUri); +console.log(SNode1Constants.dbUri) function patchPromiseWithState() { - Object.defineProperty(Promise.prototype, "state", { - get: function () { - const o = {}; - return Promise.race([this, o]).then( - v => v === o ? "pending" : "resolved", - () => "rejected"); - } - }); + Object.defineProperty(Promise.prototype, 'state', { + get: function () { + const o = {} + return Promise.race([this, o]).then( + (v) => (v === o ? 'pending' : 'resolved'), + () => 'rejected' + ) + } + }) } -patchPromiseWithState(); +patchPromiseWithState() async function cleanAllTablesAndInitNetworkStorageLayout(db: IDatabase) { - await db.result('TRUNCATE TABLE network_storage_layout'); - console.log("cleaning up table: network_storage_layout"); + await db.result('TRUNCATE TABLE network_storage_layout') + console.log('cleaning up table: network_storage_layout') - await db.result('TRUNCATE TABLE node_storage_layout'); - console.log("cleaning up table: node_storage_layout"); + await db.result('TRUNCATE TABLE node_storage_layout') + console.log('cleaning up table: node_storage_layout') - const resultT2 = await db.result(`select string_agg(table_name, ',') as sql + const resultT2 = await db.result(`select string_agg(table_name, ',') as sql from information_schema.tables - where table_name like 'storage_%';`); - console.log(resultT2); - if (resultT2.rowCount > 0 && resultT2.rows[0].sql != null) { - let truncateStorageTablesSql = resultT2.rows[0].sql; - console.log("cleaning up storage tables with query: ", truncateStorageTablesSql); - const resultT3 = await db.result('TRUNCATE TABLE ' + truncateStorageTablesSql); - console.log("cleaning up tables: ", resultT3.duration, "ms"); - const resultT4 = await db.result('DROP TABLE ' + truncateStorageTablesSql); - console.log("dropping tables: ", resultT4.duration, "ms"); - } - - const maxNodeId = 3; - const maxShardId = 255; - const namespaceArr = ['feeds']; - let sql = `INSERT INTO network_storage_layout + where table_name like 'storage_%';`) + console.log(resultT2) + if (resultT2.rowCount > 0 && resultT2.rows[0].sql != null) { + const truncateStorageTablesSql = resultT2.rows[0].sql + console.log('cleaning up storage tables with query: ', truncateStorageTablesSql) + const resultT3 = await db.result('TRUNCATE TABLE ' + truncateStorageTablesSql) + console.log('cleaning up tables: ', resultT3.duration, 'ms') + const resultT4 = await db.result('DROP TABLE ' + truncateStorageTablesSql) + console.log('dropping tables: ', resultT4.duration, 'ms') + } + + const maxNodeId = 3 + const maxShardId = 255 + const namespaceArr = ['feeds'] + let sql = `INSERT INTO network_storage_layout (namespace, namespace_shard_id, node_id) - VALUES `; - let first = true; - for (let namespace of namespaceArr) { - for (let shardId = 0; shardId <= maxShardId; shardId++) { - for (let nodeId = 0; nodeId <= maxNodeId; nodeId++) { - sql += (first ? '' : ',') + `('${namespace}', '${shardId}', '${nodeId}')`; - first = false; - } - } + VALUES ` + let first = true + for (const namespace of namespaceArr) { + for (let shardId = 0; shardId <= maxShardId; shardId++) { + for (let nodeId = 0; nodeId <= maxNodeId; nodeId++) { + sql += (first ? '' : ',') + `('${namespace}', '${shardId}', '${nodeId}')` + first = false + } } - console.log(sql); - const result2 = await db.result(sql); - console.log("insert new data, affected rows ", result2.rowCount); + } + console.log(sql) + const result2 = await db.result(sql) + console.log('insert new data, affected rows ', result2.rowCount) } async function selectWithParam(db: IDatabase, value: string) { - const result1 = await db.result('SELECT $1 as VALUE', [value]); - console.log(result1.rows[0].value); + const result1 = await db.result('SELECT $1 as VALUE', [value]) + console.log(result1.rows[0].value) } -async function doPut(baseUri: string, ns: string, nsIndex: string, ts: number, key: string, data: any): Promise { - let url = `${baseUri}/api/v1/kv/ns/${ns}/nsidx/${nsIndex}/ts/${ts}/key/${key}`; - console.log(`PUT ${url}`, data); - let resp = await axios.post(url, data, {timeout: 5000}); - console.log(resp.status); - return resp; +async function doPut( + baseUri: string, + ns: string, + nsIndex: string, + ts: number, + key: string, + data: any +): Promise { + const url = `${baseUri}/api/v1/kv/ns/${ns}/nsidx/${nsIndex}/ts/${ts}/key/${key}` + console.log(`PUT ${url}`, data) + const resp = await axios.post(url, data, { timeout: 5000 }) + console.log(resp.status) + return resp } -async function doGet(baseUri: string, ns: string, nsIndex: string, date: string, key: string): Promise { - let resp = await axios.get(`${baseUri}/api/v1/kv/ns/${ns}/nsidx/${nsIndex}/date/${date}/key/${key}`, {timeout: 3000}); - console.log(resp.status, resp.data); - return resp; +async function doGet( + baseUri: string, + ns: string, + nsIndex: string, + date: string, + key: string +): Promise { + const resp = await axios.get( + `${baseUri}/api/v1/kv/ns/${ns}/nsidx/${nsIndex}/date/${date}/key/${key}`, + { timeout: 3000 } + ) + console.log(resp.status, resp.data) + return resp } -async function doList(baseUri: string, ns: string, nsIndex: string, month: string, firstTs?: number): Promise { - let url = `${baseUri}/api/v1/kv/ns/${ns}/nsidx/${nsIndex}/month/${month}/list/`; - if (firstTs != null) { - url += `?firstTs=${firstTs}` - } - let resp = await axios.post(url, {timeout: 3000}); - console.log('LIST', url, resp.status, resp.data); - return resp; +async function doList( + baseUri: string, + ns: string, + nsIndex: string, + month: string, + firstTs?: number +): Promise { + let url = `${baseUri}/api/v1/kv/ns/${ns}/nsidx/${nsIndex}/month/${month}/list/` + if (firstTs != null) { + url += `?firstTs=${firstTs}` + } + const resp = await axios.post(url, { timeout: 3000 }) + console.log('LIST', url, resp.status, resp.data) + return resp } async function performOneTest(baseUri: string, ns: string, testCounter: number): Promise { - const startTs = Date.now(); - console.log("-->started test", testCounter); - let key = crypto.randomUUID(); - let dataToDb = { - name: 'john' + '1', - surname: crypto.randomBytes(10).toString('base64'), - id: RandomUtil.getRandomInt(0, 100000) - }; - - let nsIndex = RandomUtil.getRandomInt(0, 100000).toString(); - - let dateObj = RandomUtil.getRandomDate(DateTime.fromObject({year: 2022, month: 1, day: 1}), - DateTime.fromObject({year: 2022, month: 12, day: 31})) - if (!dateObj.isValid) { - console.log("INVALID DATE !!! ", dateObj); - return - } - let dateFormatted = dateObj.toFormat('yyyyMMdd'); - console.log('dateFormatted', dateFormatted); + const startTs = Date.now() + console.log('-->started test', testCounter) + const key = crypto.randomUUID() + const dataToDb = { + name: 'john' + '1', + surname: crypto.randomBytes(10).toString('base64'), + id: RandomUtil.getRandomInt(0, 100000) + } + + const nsIndex = RandomUtil.getRandomInt(0, 100000).toString() + + const dateObj = RandomUtil.getRandomDate( + DateTime.fromObject({ year: 2022, month: 1, day: 1 }), + DateTime.fromObject({ year: 2022, month: 12, day: 31 }) + ) + if (!dateObj.isValid) { + console.log('INVALID DATE !!! ', dateObj) + return + } + const dateFormatted = dateObj.toFormat('yyyyMMdd') + console.log('dateFormatted', dateFormatted) + + const putResult = await doPut( + baseUri, + ns, + nsIndex, + dateObj.toUnixInteger() + 0.123456, + key, + dataToDb + ) + if (putResult.status != 201) { + console.log('PUT ERROR!!! ', putResult.status) + return + } + const getResult = await doGet(baseUri, ns, nsIndex, dateFormatted, key) + if (getResult.status != 200) { + console.log('GET ERROR!!! ', getResult.status) + return + } + const dataFromDb = getResult.data.items[0].payload + const isEqual = _.isEqual(dataToDb, dataFromDb) + if (!isEqual) { + console.log(`isEqual = `, isEqual) + console.log('dataToDb', dataToDb, 'dataFromDb', dataFromDb) + } + console.log('-->finished test', testCounter, ' elapsed ', (Date.now() - startTs) / 1000.0) + expect(isEqual).equals(true) + return Promise.resolve(getResult.status) +} - let putResult = await doPut(baseUri, ns, nsIndex, dateObj.toUnixInteger() + 0.123456, key, dataToDb); - if (putResult.status != 201) { - console.log('PUT ERROR!!! ', putResult.status); - return; +describe('snode-full', function () { + const pg = pgPromise({}) + const snodeDb = pg(SNode1Constants.dbUri) + it('snode-init', async function () { + this.timeout(30000) + await cleanAllTablesAndInitNetworkStorageLayout(snodeDb) + }) + + it('snode-put-get', async function () { + this.timeout(30000) + + const promiseArr = [] + console.log('PARALLEL_THREADS=', process.env.PARALLEL_THREADS) + const parallelThreads = process.env.PARALLEL_THREADS || 50 + for (let i = 0; i < parallelThreads; i++) { + try { + promiseArr.push(performOneTest(SNode1Constants.apiUrl, SNode1Constants.namespace, i)) // TODO !!!!!!!!! + } catch (e) { + console.log('failed to submit thread#', i) + } } - let getResult = await doGet(baseUri, ns, nsIndex, dateFormatted, key); - if (getResult.status != 200) { - console.log('GET ERROR!!! ', getResult.status); - return; + const result = await PromiseUtil.allSettled(promiseArr) + console.log(result) + for (const p of promiseArr) { + try { + await p + } catch (e) { + console.log('failed to await promise', e) + } + console.log(p.state) } - let dataFromDb = getResult.data.items[0].payload; - let isEqual = _.isEqual(dataToDb, dataFromDb); - if (!isEqual) { - console.log(`isEqual = `, isEqual); - console.log('dataToDb', dataToDb, 'dataFromDb', dataFromDb); + }) + + it('snode-test-list', async function () { + const numOfRowsToGenerate = 37 + const seedDate = DateUtil.buildDateTime(2015, 1, 22) + // this is almost unique inbox, so it's empty + const nsIndex = '' + RandomUtil.getRandomInt(1000, 10000000000000000) + // Generate data within the same month, only days are random + for (let i = 0; i < numOfRowsToGenerate; i++) { + const key = crypto.randomUUID() + const dataToDb = { + name: 'john' + '1', + surname: crypto.randomBytes(10).toString('base64'), + id: RandomUtil.getRandomInt(0, 100000) + } + const randomDayWithinSameMonth = RandomUtil.getRandomDateSameMonth(seedDate) + const ts = DateUtil.dateTimeToUnixFloat(randomDayWithinSameMonth) + const putResult = await doPut( + SNode1Constants.apiUrl, + SNode1Constants.namespace, + nsIndex, + ts, + key, + dataToDb + ) } - console.log("-->finished test", testCounter, " elapsed ", (Date.now() - startTs) / 1000.0); - expect(isEqual).equals(true); - return Promise.resolve(getResult.status); -} - -describe('snode-full', function () { - const pg = pgPromise({}); - const snodeDb = pg(SNode1Constants.dbUri); - it('snode-init', async function () { - this.timeout(30000); - await cleanAllTablesAndInitNetworkStorageLayout(snodeDb); - }) - - it('snode-put-get', async function () { - this.timeout(30000); - - let promiseArr = []; - console.log('PARALLEL_THREADS=', process.env.PARALLEL_THREADS); - let parallelThreads = process.env.PARALLEL_THREADS || 50; - for (let i = 0; i < parallelThreads; i++) { - try { - promiseArr.push(performOneTest(SNode1Constants.apiUrl, SNode1Constants.namespace, i)); // TODO !!!!!!!!! - } catch (e) { - console.log("failed to submit thread#", i); - } - - } - let result = await PromiseUtil.allSettled(promiseArr); - console.log(result); - for (const p of promiseArr) { - try { - await p; - } catch (e) { - console.log('failed to await promise', e) - } - console.log(p.state); - } - }); - - it('snode-test-list', async function () { - const numOfRowsToGenerate = 37; - const seedDate = DateUtil.buildDateTime(2015, 1, 22); - // this is almost unique inbox, so it's empty - const nsIndex = '' + RandomUtil.getRandomInt(1000, 10000000000000000); - // Generate data within the same month, only days are random - for (let i = 0; i < numOfRowsToGenerate; i++) { - const key = crypto.randomUUID(); - const dataToDb = { - name: 'john' + '1', - surname: crypto.randomBytes(10).toString('base64'), - id: RandomUtil.getRandomInt(0, 100000) - }; - let randomDayWithinSameMonth = RandomUtil.getRandomDateSameMonth(seedDate); - const ts = DateUtil.dateTimeToUnixFloat(randomDayWithinSameMonth); - let putResult = await doPut(SNode1Constants.apiUrl, SNode1Constants.namespace, nsIndex, - ts, key, dataToDb); - - } - - // Query Generated rows and count them - let month = DateUtil.formatYYYYMM(seedDate); - let resp; - let itemsLength; - let query = 0; - let firstTs; - let totalRowsFetched = 0; - do { - resp = await doList(SNode1Constants.apiUrl, SNode1Constants.namespace, nsIndex, month, firstTs); - query++; - itemsLength = resp?.data?.items?.length || 0; - totalRowsFetched += itemsLength; - let lastTs = resp?.data?.lastTs; - console.log('query', query, `got `, itemsLength, 'items, lastTs = ', lastTs); - firstTs = lastTs; - // await PromiseUtil.sleep(10000); - } while ((resp?.data?.items?.length || 0) > 0) - - console.log('total rows fetched ', totalRowsFetched); - - // Compare rows in DB (storageTable) vs rows fetched via API - const shardId = DbHelper.calculateShardForNamespaceIndex(SNode1Constants.namespace, nsIndex); - const storageTable = await DbHelper.findStorageTableByDate(SNode1Constants.namespace, shardId, seedDate); - const rowCount = await snodeDb.one(`SELECT count(*) as count + // Query Generated rows and count them + const month = DateUtil.formatYYYYMM(seedDate) + let resp + let itemsLength + let query = 0 + let firstTs + let totalRowsFetched = 0 + do { + resp = await doList( + SNode1Constants.apiUrl, + SNode1Constants.namespace, + nsIndex, + month, + firstTs + ) + query++ + itemsLength = resp?.data?.items?.length || 0 + totalRowsFetched += itemsLength + const lastTs = resp?.data?.lastTs + console.log('query', query, `got `, itemsLength, 'items, lastTs = ', lastTs) + firstTs = lastTs + // await PromiseUtil.sleep(10000); + } while ((resp?.data?.items?.length || 0) > 0) + + console.log('total rows fetched ', totalRowsFetched) + + // Compare rows in DB (storageTable) vs rows fetched via API + const shardId = DbHelper.calculateShardForNamespaceIndex(SNode1Constants.namespace, nsIndex) + const storageTable = await DbHelper.findStorageTableByDate( + SNode1Constants.namespace, + shardId, + seedDate + ) + const rowCount = await snodeDb + .one( + `SELECT count(*) as count FROM ${storageTable} - WHERE namespace_id='${nsIndex}'`) - .then(value => Number.parseInt(value.count)); - console.log(`${storageTable} contains ${rowCount} rows`); - expect(rowCount).equals(totalRowsFetched); - }); + WHERE namespace_id='${nsIndex}'` + ) + .then((value) => Number.parseInt(value.count)) + console.log(`${storageTable} contains ${rowCount} rows`) + expect(rowCount).equals(totalRowsFetched) + }) }) diff --git a/tests/VNodeIntegration.test.ts b/tests/VNodeIntegration.test.ts index 21b280f..5b0e21c 100644 --- a/tests/VNodeIntegration.test.ts +++ b/tests/VNodeIntegration.test.ts @@ -1,149 +1,161 @@ +import { AxiosResponse } from 'axios' import chai from 'chai' -import {AxiosResponse} from "axios"; // import crypto from 'crypto' -const crypto = require("crypto"); -var _ = require('lodash'); +const crypto = require('crypto') +const _ = require('lodash') const expect = chai.expect -import assert from 'assert-ts'; -const axios = require('axios'); -import { RandomUtil, PromiseUtil, EnvLoader, DateUtil, VNodeClient } from 'dstorage-common'; -import {DateTime} from "ts-luxon"; -import DbHelper from "../src/helpers/dbHelper" +const axios = require('axios') +import { DateUtil, EnvLoader, PromiseUtil, RandomUtil, VNodeClient } from 'dstorage-common' +import { DateTime } from 'ts-luxon' - -EnvLoader.loadEnvOrFail(); +EnvLoader.loadEnvOrFail() class VNode1Constants { - // API TESTS - static apiUrl = EnvLoader.getPropertyOrFail('TEST_VNODE1_API_URL'); - static namespace = 'feeds'; + // API TESTS + static apiUrl = EnvLoader.getPropertyOrFail('TEST_VNODE1_API_URL') + static namespace = 'feeds' } -let vnodeClient = new VNodeClient(); +const vnodeClient = new VNodeClient() describe('vnode-full', function () { - - it('vnode-put-get', async function () { - this.timeout(30000); - - let promiseArr = []; - console.log('PARALLEL_THREADS=', process.env.PARALLEL_THREADS); - let parallelThreads = process.env.PARALLEL_THREADS || 1; - for (let i = 0; i < parallelThreads; i++) { - try { - promiseArr.push(performOneTest(VNode1Constants.apiUrl, VNode1Constants.namespace, i)); - } catch (e) { - console.log("failed to submit thread#", i); - } - - } - let result = await PromiseUtil.allSettled(promiseArr); - console.log(result); - for (const p of promiseArr) { - try { - await p; - } catch (e) { - console.log('failed to await promise', e) - } - console.log(p.state); - } - }) - - it('vnode-test-list', async function () { - const numOfRowsToGenerate = 37; - const seedDate = DateUtil.buildDateTime(2015, 1, 22); - // this is almost unique inbox, so it's empty - const nsIndex = '' + RandomUtil.getRandomInt(1000, 10000000000000000); - // Generate data within the same month, only days are random - let storedKeysSet = new Set(); - for (let i = 0; i < numOfRowsToGenerate; i++) { - const key = crypto.randomUUID(); - storedKeysSet.add(key); - const dataToDb = { - name: 'john' + '1', - surname: crypto.randomBytes(10).toString('base64'), - id: RandomUtil.getRandomInt(0, 100000) - }; - let randomDayWithinSameMonth = RandomUtil.getRandomDateSameMonth(seedDate); - const ts = DateUtil.dateTimeToUnixFloat(randomDayWithinSameMonth); - let putResult = await vnodeClient.postRecord(VNode1Constants.apiUrl, VNode1Constants.namespace, nsIndex, - ts + '', key, dataToDb); - - } - - // Query Generated rows and count them - let month = DateUtil.formatYYYYMM(seedDate); - let resp: AxiosResponse; - let itemsLength; - let query = 0; - let firstTs = null; - let totalRowsFetched = 0; - do { - resp = await vnodeClient.listRecordsByMonth(VNode1Constants.apiUrl, VNode1Constants.namespace, nsIndex, month, firstTs); - query++; - itemsLength = resp?.data?.items?.length || 0; - totalRowsFetched += itemsLength; - let lastTs = resp?.data?.result?.lastTs; - console.log('query', query, `got `, itemsLength, 'items, lastTs = ', lastTs); - firstTs = lastTs - for (let item of resp?.data?.items || []) { - let success = storedKeysSet.delete(item.skey); - console.log(`correct key ${item.skey} , success=${success}`); - } - // await PromiseUtil.sleep(10000); - } while ((resp?.data?.items?.length || 0) > 0) - - console.log('total rows fetched ', totalRowsFetched); - - // Compare rows stored vs rows fetched back via API - expect(storedKeysSet.size).equals(0); - - - }); - -}); - -async function performOneTest(baseUri: string, ns: string, testCounter: number): Promise { - const startTs = Date.now(); - console.log("-->started test", testCounter); - let key = crypto.randomUUID(); - let dataToDb = { + it('vnode-put-get', async function () { + this.timeout(30000) + + const promiseArr = [] + console.log('PARALLEL_THREADS=', process.env.PARALLEL_THREADS) + const parallelThreads = process.env.PARALLEL_THREADS || 1 + for (let i = 0; i < parallelThreads; i++) { + try { + promiseArr.push(performOneTest(VNode1Constants.apiUrl, VNode1Constants.namespace, i)) + } catch (e) { + console.log('failed to submit thread#', i) + } + } + const result = await PromiseUtil.allSettled(promiseArr) + console.log(result) + for (const p of promiseArr) { + try { + await p + } catch (e) { + console.log('failed to await promise', e) + } + console.log(p.state) + } + }) + + it('vnode-test-list', async function () { + const numOfRowsToGenerate = 37 + const seedDate = DateUtil.buildDateTime(2015, 1, 22) + // this is almost unique inbox, so it's empty + const nsIndex = '' + RandomUtil.getRandomInt(1000, 10000000000000000) + // Generate data within the same month, only days are random + const storedKeysSet = new Set() + for (let i = 0; i < numOfRowsToGenerate; i++) { + const key = crypto.randomUUID() + storedKeysSet.add(key) + const dataToDb = { name: 'john' + '1', surname: crypto.randomBytes(10).toString('base64'), id: RandomUtil.getRandomInt(0, 100000) - }; - - let nsIndex = RandomUtil.getRandomInt(0, 100000).toString(); - - let dateObj = RandomUtil.getRandomDate(DateTime.fromObject({year: 2022, month: 1, day: 1}), - DateTime.fromObject({year: 2022, month: 12, day: 31})) - if (!dateObj.isValid) { - console.log("INVALID DATE !!! ", dateObj); - return + } + const randomDayWithinSameMonth = RandomUtil.getRandomDateSameMonth(seedDate) + const ts = DateUtil.dateTimeToUnixFloat(randomDayWithinSameMonth) + const putResult = await vnodeClient.postRecord( + VNode1Constants.apiUrl, + VNode1Constants.namespace, + nsIndex, + ts + '', + key, + dataToDb + ) } - let dateFormatted = dateObj.toFormat('yyyyMMdd'); - console.log('dateFormatted', dateFormatted); - let putResult = await vnodeClient.postRecord(baseUri, ns, nsIndex, dateObj.toUnixInteger() + '', key, dataToDb); - if (putResult.status != 201) { - console.log('PUT ERROR!!! ', putResult.status); - return; - } - let getResult = await vnodeClient.getRecord(baseUri, ns, nsIndex, dateFormatted, key); - if (getResult.status != 200) { - console.log('GET ERROR!!! ', getResult.status); - return; - } - let dataFromDb = getResult.data.items[0].payload; - let isEqual = _.isEqual(dataToDb, dataFromDb); - if (!isEqual) { - console.log(`isEqual = `, isEqual); - console.log('dataToDb', dataToDb, 'dataFromDb', dataFromDb); - } - expect(getResult.data.result.quorumResult).equals('QUORUM_OK'); - console.log("-->finished test", testCounter, " elapsed ", (Date.now() - startTs) / 1000.0); - expect(isEqual).equals(true); - return Promise.resolve(getResult.status); + // Query Generated rows and count them + const month = DateUtil.formatYYYYMM(seedDate) + let resp: AxiosResponse + let itemsLength + let query = 0 + let firstTs = null + let totalRowsFetched = 0 + do { + resp = await vnodeClient.listRecordsByMonth( + VNode1Constants.apiUrl, + VNode1Constants.namespace, + nsIndex, + month, + firstTs + ) + query++ + itemsLength = resp?.data?.items?.length || 0 + totalRowsFetched += itemsLength + const lastTs = resp?.data?.result?.lastTs + console.log('query', query, `got `, itemsLength, 'items, lastTs = ', lastTs) + firstTs = lastTs + for (const item of resp?.data?.items || []) { + const success = storedKeysSet.delete(item.skey) + console.log(`correct key ${item.skey} , success=${success}`) + } + // await PromiseUtil.sleep(10000); + } while ((resp?.data?.items?.length || 0) > 0) + + console.log('total rows fetched ', totalRowsFetched) + + // Compare rows stored vs rows fetched back via API + expect(storedKeysSet.size).equals(0) + }) +}) + +async function performOneTest(baseUri: string, ns: string, testCounter: number): Promise { + const startTs = Date.now() + console.log('-->started test', testCounter) + const key = crypto.randomUUID() + const dataToDb = { + name: 'john' + '1', + surname: crypto.randomBytes(10).toString('base64'), + id: RandomUtil.getRandomInt(0, 100000) + } + + const nsIndex = RandomUtil.getRandomInt(0, 100000).toString() + + const dateObj = RandomUtil.getRandomDate( + DateTime.fromObject({ year: 2022, month: 1, day: 1 }), + DateTime.fromObject({ year: 2022, month: 12, day: 31 }) + ) + if (!dateObj.isValid) { + console.log('INVALID DATE !!! ', dateObj) + return + } + const dateFormatted = dateObj.toFormat('yyyyMMdd') + console.log('dateFormatted', dateFormatted) + + const putResult = await vnodeClient.postRecord( + baseUri, + ns, + nsIndex, + dateObj.toUnixInteger() + '', + key, + dataToDb + ) + if (putResult.status != 201) { + console.log('PUT ERROR!!! ', putResult.status) + return + } + const getResult = await vnodeClient.getRecord(baseUri, ns, nsIndex, dateFormatted, key) + if (getResult.status != 200) { + console.log('GET ERROR!!! ', getResult.status) + return + } + const dataFromDb = getResult.data.items[0].payload + const isEqual = _.isEqual(dataToDb, dataFromDb) + if (!isEqual) { + console.log(`isEqual = `, isEqual) + console.log('dataToDb', dataToDb, 'dataFromDb', dataFromDb) + } + expect(getResult.data.result.quorumResult).equals('QUORUM_OK') + console.log('-->finished test', testCounter, ' elapsed ', (Date.now() - startTs) / 1000.0) + expect(isEqual).equals(true) + return Promise.resolve(getResult.status) } diff --git a/tests/root.ts b/tests/root.ts index 1677cc5..4d4dbf8 100755 --- a/tests/root.ts +++ b/tests/root.ts @@ -1,10 +1,10 @@ -import chalk from 'chalk'; +import chalk from 'chalk' export const mochaHooks = { // This file is needed to end the test suite. afterAll(done) { - done(); - console.log(chalk.bold.green.inverse(' ALL TEST CASES EXECUTED ')); - process.exit(0); + done() + console.log(chalk.bold.green.inverse(' ALL TEST CASES EXECUTED ')) + process.exit(0) } -}; \ No newline at end of file +}