diff --git a/.github/workflows/build-pr-artifacts.yml b/.github/workflows/build-pr-artifacts.yml index 3b5ba20eb4..16d862c131 100644 --- a/.github/workflows/build-pr-artifacts.yml +++ b/.github/workflows/build-pr-artifacts.yml @@ -38,7 +38,7 @@ jobs: if: startsWith(github.event.pull_request.head.ref, 'release/') != true && startsWith(github.event.pull_request.head.ref, 'hotfix-release/') != true && github.event.pull_request.head.ref != 'main' steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 @@ -52,7 +52,7 @@ jobs: password: ${{ secrets.DOCKERHUB_PROD_TOKEN }} - name: Cache Docker Layers - uses: actions/cache@v3.2.1 + uses: actions/cache@v3.3.1 with: path: /tmp/.buildx-cache key: ${{ runner.os }}-buildx-${{ github.sha }} @@ -110,7 +110,7 @@ jobs: if: startsWith(github.event.pull_request.head.ref, 'release/') != true && startsWith(github.event.pull_request.head.ref, 'hotfix-release/') != true && github.event.pull_request.head.ref != 'main' steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 @@ -124,7 +124,7 @@ jobs: password: ${{ secrets.DOCKERHUB_PROD_TOKEN }} - name: Cache Docker Layers - uses: actions/cache@v3.2.1 + uses: actions/cache@v3.3.1 with: path: /tmp/.buildx-cache key: ${{ runner.os }}-buildx-${{ github.sha }} diff --git a/.github/workflows/create-hotfix-branch.yml b/.github/workflows/create-hotfix-branch.yml index 6a95f0e619..05fc34e397 100644 --- a/.github/workflows/create-hotfix-branch.yml +++ b/.github/workflows/create-hotfix-branch.yml @@ -13,11 +13,11 @@ jobs: runs-on: ubuntu-latest # Only allow these users to create new hotfix branch from 'main' - if: github.ref == 'refs/heads/main' && (github.actor == 'ItsSudip' || github.actor == 'krishna2020' || github.actor == 'saikumarrs') && (github.triggering_actor == 'ItsSudip' || github.triggering_actor == 'krishna2020' || github.triggering_actor == 'saikumarrs') + if: github.ref == 'refs/heads/main' && (github.actor == 'ItsSudip' || github.actor == 'krishna2020' || github.actor == 'saikumarrs') && (github.triggering_actor == 'ItsSudip' || github.triggering_actor == 'krishna2020' || github.triggering_actor == 'saikumarrs') steps: - name: Create Branch uses: peterjgrainger/action-create-branch@v2.4.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: - branch: 'hotfix/${{ github.event.inputs.hotfix_name }}' + branch: 'hotfix/${{ inputs.hotfix_name }}' diff --git a/.github/workflows/draft-new-release.yml b/.github/workflows/draft-new-release.yml index 9adb188c0e..5695ed130e 100644 --- a/.github/workflows/draft-new-release.yml +++ b/.github/workflows/draft-new-release.yml @@ -11,7 +11,7 @@ jobs: if: (github.ref == 'refs/heads/develop' || startsWith(github.ref, 'refs/heads/hotfix/')) && (github.actor == 'ItsSudip' || github.actor == 'krishna2020' || github.actor == 'saikumarrs' ) && (github.triggering_actor == 'ItsSudip' || github.triggering_actor == 'krishna2020' || github.triggering_actor == 'saikumarrs') steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 0 @@ -21,6 +21,12 @@ jobs: node-version-file: '.nvmrc' cache: 'npm' + - name: Install Dependencies + env: + HUSKY: 0 + run: | + npm ci + # In order to make a commit, we need to initialize a user. # You may choose to write something less generic here if you want, it doesn't matter functionality wise. - name: Initialize Mandatory Git Config diff --git a/.github/workflows/housekeeping.yml b/.github/workflows/housekeeping.yml index 38bb5e7f61..38a21af646 100644 --- a/.github/workflows/housekeeping.yml +++ b/.github/workflows/housekeeping.yml @@ -1,8 +1,9 @@ -name: Handle Stale PRs, Issues and Branhes +name: Handle Stale PRs, Issues and Branches on: schedule: - - cron: '42 1 * * *' + # Run everyday at 1 AM + - cron: '0 1 * * *' jobs: prs: @@ -13,9 +14,9 @@ jobs: pull-requests: write steps: - - uses: actions/stale@v7.0.0 + - uses: actions/stale@v8.0.0 with: - repo-token: ${{ github.token }} + repo-token: ${{ secrets.PAT }} operations-per-run: 200 stale-pr-message: 'This PR is considered to be stale. It has been open for 20 days with no further activity thus it is going to be closed in 7 days. To avoid such a case please consider removing the stale label manually or add a comment to the PR.' stale-issue-message: 'This issue is considered to be stale. It has been open for 30 days with no further activity thus it is going to be closed in 7 days. To avoid such a case please consider removing the stale label manually or add a comment to the issue.' @@ -31,12 +32,12 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 - name: Delete Old Branches uses: beatlabs/delete-old-branches-action@v0.0.9 with: - repo_token: ${{ github.token }} + repo_token: ${{ secrets.PAT }} date: '3 months ago' dry_run: false delete_tags: false diff --git a/.github/workflows/prepare-for-dev-deploy.yml b/.github/workflows/prepare-for-dev-deploy.yml index 86833538e7..e25d828194 100644 --- a/.github/workflows/prepare-for-dev-deploy.yml +++ b/.github/workflows/prepare-for-dev-deploy.yml @@ -17,6 +17,7 @@ env: jobs: report-coverage: + name: Report Code Coverage if: github.event_name == 'push' uses: ./.github/workflows/report-code-coverage.yml @@ -35,7 +36,7 @@ jobs: echo "tag_name=$tag_name" >> $GITHUB_OUTPUT - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 @@ -96,7 +97,7 @@ jobs: echo "tag_name=$tag_name" >> $GITHUB_OUTPUT - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 diff --git a/.github/workflows/prepare-for-prod-deploy.yml b/.github/workflows/prepare-for-prod-deploy.yml index 1175f4635c..ab835af44f 100644 --- a/.github/workflows/prepare-for-prod-deploy.yml +++ b/.github/workflows/prepare-for-prod-deploy.yml @@ -17,6 +17,7 @@ env: jobs: report-coverage: + name: Report Code Coverage if: github.event_name == 'push' uses: ./.github/workflows/report-code-coverage.yml @@ -29,7 +30,7 @@ jobs: tag_name: ${{ steps.gen_tag_name.outputs.tag_name }} steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 @@ -50,7 +51,7 @@ jobs: password: ${{ secrets.DOCKERHUB_PROD_TOKEN }} - name: Cache Docker Layers - uses: actions/cache@v3.2.1 + uses: actions/cache@v3.3.1 with: path: /tmp/.buildx-cache key: ${{ runner.os }}-buildx-${{ github.sha }} @@ -109,7 +110,7 @@ jobs: tag_name: ${{ steps.gen_tag_name.outputs.tag_name }} steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 @@ -130,7 +131,7 @@ jobs: password: ${{ secrets.DOCKERHUB_PROD_TOKEN }} - name: Cache Docker Layers - uses: actions/cache@v3.2.1 + uses: actions/cache@v3.3.1 with: path: /tmp/.buildx-cache key: ${{ runner.os }}-buildx-${{ github.sha }} @@ -189,7 +190,7 @@ jobs: UT_TAG_NAME: ${{ needs.build-user-transformer-image.outputs.tag_name }} steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 diff --git a/.github/workflows/prepare-for-staging-deploy.yml b/.github/workflows/prepare-for-staging-deploy.yml index 0f8fcc6788..ab9cb959f9 100644 --- a/.github/workflows/prepare-for-staging-deploy.yml +++ b/.github/workflows/prepare-for-staging-deploy.yml @@ -25,7 +25,7 @@ jobs: if: (startsWith(github.event.pull_request.head.ref, 'release/') || startsWith(github.event.pull_request.head.ref, 'hotfix-release/')) steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 @@ -46,7 +46,7 @@ jobs: password: ${{ secrets.DOCKERHUB_PROD_TOKEN }} - name: Cache Docker Layers - uses: actions/cache@v3.2.1 + uses: actions/cache@v3.3.1 with: path: /tmp/.buildx-cache key: ${{ runner.os }}-buildx-${{ github.sha }} @@ -104,7 +104,7 @@ jobs: if: (startsWith(github.event.pull_request.head.ref, 'release/') || startsWith(github.event.pull_request.head.ref, 'hotfix-release/')) steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 @@ -125,7 +125,7 @@ jobs: password: ${{ secrets.DOCKERHUB_PROD_TOKEN }} - name: Cache Docker Layers - uses: actions/cache@v3.2.1 + uses: actions/cache@v3.3.1 with: path: /tmp/.buildx-cache key: ${{ runner.os }}-buildx-${{ github.sha }} @@ -182,7 +182,7 @@ jobs: UT_TAG_NAME: ${{ needs.build-user-transformer-image.outputs.tag_name }} steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 diff --git a/.github/workflows/publish-new-release.yml b/.github/workflows/publish-new-release.yml index cc428a8849..9b2ce52376 100644 --- a/.github/workflows/publish-new-release.yml +++ b/.github/workflows/publish-new-release.yml @@ -25,7 +25,7 @@ jobs: echo "release_version=$version" >> $GITHUB_OUTPUT - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 0 @@ -35,6 +35,12 @@ jobs: node-version-file: '.nvmrc' cache: 'npm' + - name: Install Dependencies + env: + HUSKY: 0 + run: | + npm ci + # In order to make a commit, we need to initialize a user. # You may choose to write something less generic here if you want, it doesn't matter functionality wise. - name: Initialize Mandatory Git Config @@ -46,8 +52,8 @@ jobs: id: create_release env: HUSKY: 0 - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - CONVENTIONAL_GITHUB_RELEASER_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_TOKEN: ${{ secrets.PAT }} + CONVENTIONAL_GITHUB_RELEASER_TOKEN: ${{ secrets.PAT }} run: | git tag -a v${{ steps.extract-version.outputs.release_version }} -m "chore: release v${{ steps.extract-version.outputs.release_version }}" git push origin refs/tags/v${{ steps.extract-version.outputs.release_version }} @@ -79,3 +85,35 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Notify Slack Channel + id: slack + uses: slackapi/slack-github-action@v1.23.0 + env: + SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} + PROJECT_NAME: 'Rudder Transformer' + RELEASES_URL: 'https://github.com/rudderlabs/rudder-transformer/releases/tag/' + with: + channel-id: ${{ secrets.SLACK_RELEASE_CHANNEL_ID }} + payload: | + { + "blocks": [ + { + "type": "header", + "text": { + "type": "plain_text", + "text": ":tada: ${{ env.PROJECT_NAME }} - New GitHub Release :tada:" + } + }, + { + "type": "divider" + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "*<${{env.RELEASES_URL}}v${{ steps.extract-version.outputs.release_version }}|v${{ steps.extract-version.outputs.release_version }}>*\nCC: <@U03KG4BK1L1> <@U02AE5GMMHV> <@U01LVJ30QEB>" + } + } + ] + } + diff --git a/.github/workflows/report-code-coverage.yml b/.github/workflows/report-code-coverage.yml index 321c5e5e96..667d5095bc 100644 --- a/.github/workflows/report-code-coverage.yml +++ b/.github/workflows/report-code-coverage.yml @@ -11,7 +11,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 03a112bb1a..d5a233833f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,7 +13,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v3.5.0 with: fetch-depth: 1 diff --git a/README.md b/README.md index f2c267e54e..12655201cc 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ -[![tested with jest](https://img.shields.io/badge/tested_with-jest-99424f.svg)](https://github.com/facebook/jest) -[![jest](https://jestjs.io/img/jest-badge.svg)](https://github.com/facebook/jest) [![codecov](https://codecov.io/gh/rudderlabs/rudder-transformer/branch/develop/graph/badge.svg?token=G24OON85SB)](https://codecov.io/gh/rudderlabs/rudder-transformer) + # RudderStack Transformer RudderStack Transformer is a service which transforms the RudderStack events to destination-specific singular events. This feature is released under diff --git a/package.json b/package.json index a36f0d2ac4..c7b9efbd7b 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,7 @@ "author": "", "main": "GATransform.js", "scripts": { - "setup": "npm i", + "setup": "npm ci", "format": "prettier --write .", "lint": "eslint . || exit 0", "lint:fix": "eslint . --fix", diff --git a/sample.env b/sample.env index 8922678c15..995a53b1ac 100644 --- a/sample.env +++ b/sample.env @@ -1,4 +1,5 @@ PORT=9090 # application's port +METRICS_PORT=9091 # metrics's port ENABLE_FUNCTIONS=true # user transformation functions CLUSTER_ENABLED=true # cluster mode NUM_PROCS=1 # Number of workers in the cluster mode diff --git a/src/controllers/destinationProxy.js b/src/controllers/destinationProxy.js index 02ea108b78..fed9f59a84 100644 --- a/src/controllers/destinationProxy.js +++ b/src/controllers/destinationProxy.js @@ -3,9 +3,9 @@ const jsonDiff = require('json-diff'); const networkHandlerFactory = require('../adapters/networkHandlerFactory'); const { getPayloadData } = require('../adapters/network'); const { generateErrorObject } = require('../v0/util'); -const stats = require('../util/stats'); const logger = require('../logger'); const tags = require('../v0/util/tags'); +const stats = require('../util/stats'); const DestProxyController = { /** diff --git a/src/index.js b/src/index.js index e81ea3acef..14ed940757 100644 --- a/src/index.js +++ b/src/index.js @@ -6,15 +6,22 @@ const logger = require('./logger'); const { router } = require('./versionedRouter'); const { testRouter } = require('./testRouter'); +const { metricsRouter } = require('./metricsRouter'); const cluster = require('./util/cluster'); -const { addPrometheusMiddleware } = require('./middleware'); +const { addStatMiddleware } = require('./middleware'); const { logProcessInfo } = require('./util/utils'); const clusterEnabled = process.env.CLUSTER_ENABLED !== 'false'; const port = parseInt(process.env.PORT || '9090', 10); +const metricsPort = parseInt(process.env.METRICS_PORT || '9091', 10); + const app = new Koa(); -addPrometheusMiddleware(app); +addStatMiddleware(app); + +const metricsApp = new Koa(); +addStatMiddleware(metricsApp); +metricsApp.use(metricsRouter.routes()).use(metricsRouter.allowedMethods()); app.use( bodyParser({ @@ -31,8 +38,13 @@ function finalFunction() { } if (clusterEnabled) { - cluster.start(port, app); + cluster.start(port, app, metricsApp); } else { + // HTTP server for exposing metrics + if (process.env.STATS_CLIENT === 'prometheus') { + metricsApp.listen(metricsPort); + } + const server = app.listen(port); process.on('SIGTERM', () => { diff --git a/src/metricsRouter.js b/src/metricsRouter.js new file mode 100644 index 0000000000..9d8dbe54a2 --- /dev/null +++ b/src/metricsRouter.js @@ -0,0 +1,21 @@ +const KoaRouter = require('@koa/router'); +const logger = require('./logger'); +const stats = require('./util/stats'); + +const metricsRouter = new KoaRouter(); + +const enableStats = process.env.ENABLE_STATS !== 'false'; + +if (enableStats) { + metricsRouter.get('/metrics', async (ctx) => { + try { + await stats.metricsController(ctx); + } catch (error) { + logger.error(error); + ctx.status = 400; + ctx.body = error.message; + } + }); +} + +module.exports = { metricsRouter }; diff --git a/src/middleware.js b/src/middleware.js index 0536bccba2..615c79e85b 100644 --- a/src/middleware.js +++ b/src/middleware.js @@ -1,24 +1,9 @@ -const prometheusClient = require('prom-client'); -// const gcStats = require("prometheus-gc-stats"); - -const prometheusRegistry = new prometheusClient.Registry(); -prometheusClient.collectDefaultMetrics({ register: prometheusRegistry }); - -// const startGcStats = gcStats(prometheusRegistry); // gcStats() would have the same effect in this case -// startGcStats(); +const stats = require('./util/stats'); function durationMiddleware() { - const httpRequestDurationSummary = new prometheusClient.Summary({ - name: 'http_request_duration_summary_seconds', - help: 'Summary of HTTP requests duration in seconds', - labelNames: ['method', 'route', 'code'], - percentiles: [0.01, 0.1, 0.9, 0.99], - }); - - prometheusRegistry.registerMetric(httpRequestDurationSummary); - return async (ctx, next) => { - const end = httpRequestDurationSummary.startTimer(); + const startTime = new Date(); + await next(); const labels = { @@ -27,16 +12,14 @@ function durationMiddleware() { // eslint-disable-next-line no-underscore-dangle route: ctx._matchedRoute, }; - end(labels); + stats.timing('http_request_duration', startTime, labels); }; } -function addPrometheusMiddleware(app) { +function addStatMiddleware(app) { app.use(durationMiddleware()); } module.exports = { - addPrometheusMiddleware, - durationMiddleware, - prometheusRegistry, + addStatMiddleware, }; diff --git a/src/util/cluster.js b/src/util/cluster.js index 2b1e89a855..4aad63a307 100644 --- a/src/util/cluster.js +++ b/src/util/cluster.js @@ -4,6 +4,7 @@ const logger = require('../logger'); const { logProcessInfo } = require('./utils'); const numWorkers = parseInt(process.env.NUM_PROCS || '1', 10); +const metricsPort = parseInt(process.env.METRICS_PORT || '9091', 10); function finalFunction() { logger.error(`Worker (pid: ${process.pid}) was gracefully shutdown`); @@ -19,10 +20,15 @@ function shutdownWorkers() { }); } -function start(port, app) { +function start(port, app, metricsApp) { if (cluster.isMaster) { logger.info(`Master (pid: ${process.pid}) has started`); + // HTTP server for exposing metrics + if (process.env.STATS_CLIENT === 'prometheus') { + metricsApp.listen(metricsPort); + } + // Fork workers. for (let i = 0; i < numWorkers; i += 1) { cluster.fork(); diff --git a/src/util/customTransformer-faas.js b/src/util/customTransformer-faas.js index ad1bc1c653..af050a5d03 100644 --- a/src/util/customTransformer-faas.js +++ b/src/util/customTransformer-faas.js @@ -1,9 +1,14 @@ const { v4: uuidv4 } = require('uuid'); const crypto = require('crypto'); const NodeCache = require('node-cache'); -const stats = require('./stats'); const { getMetadata } = require('../v0/util'); -const { setupFaasFunction, executeFaasFunction, FAAS_AST_FN_NAME, FAAS_AST_VID } = require('./openfaas'); +const stats = require('./stats'); +const { + setupFaasFunction, + executeFaasFunction, + FAAS_AST_FN_NAME, + FAAS_AST_VID, +} = require('./openfaas'); const { getLibraryCodeV1 } = require('./customTransforrmationsStore-v1'); const libVersionIdsCache = new NodeCache(); @@ -16,15 +21,22 @@ function generateFunctionName(userTransformation, libraryVersionIds, testMode) { return funcName.substring(0, 63).toLowerCase(); } - const ids = [userTransformation.workspaceId, userTransformation.versionId].concat((libraryVersionIds || []).sort()); + const ids = [userTransformation.workspaceId, userTransformation.versionId].concat( + (libraryVersionIds || []).sort(), + ); const hash = crypto.createHash('md5').update(`${ids}`).digest('hex'); - return `fn-${userTransformation.workspaceId}-${hash}` - .substring(0, 63) - .toLowerCase(); + return `fn-${userTransformation.workspaceId}-${hash}`.substring(0, 63).toLowerCase(); } -async function extractRelevantLibraryVersionIdsForVersionId(functionName, code, versionId, libraryVersionIds, prepopulatedImports, testMode) { +async function extractRelevantLibraryVersionIdsForVersionId( + functionName, + code, + versionId, + libraryVersionIds, + prepopulatedImports, + testMode, +) { if (functionName === FAAS_AST_FN_NAME || versionId == FAAS_AST_VID) return []; const cachedLvids = libVersionIdsCache.get(functionName); @@ -35,15 +47,18 @@ async function extractRelevantLibraryVersionIdsForVersionId(functionName, code, (libraryVersionIds || []).map(async (libraryVersionId) => getLibraryCodeV1(libraryVersionId)), ); - const codeImports = prepopulatedImports || Object.keys(await require('./customTransformer').extractLibraries( - code, - versionId, - false, - [], - "pythonfaas", - testMode - ) - ); + const codeImports = + prepopulatedImports || + Object.keys( + await require('./customTransformer').extractLibraries( + code, + versionId, + false, + [], + 'pythonfaas', + testMode, + ), + ); const relevantLvids = []; @@ -80,7 +95,8 @@ async function setOpenFaasUserTransform( publish: testWithPublish, testMode, }; - const functionName = pregeneratedFnName || generateFunctionName(userTransformation, libraryVersionIds, testMode); + const functionName = + pregeneratedFnName || generateFunctionName(userTransformation, libraryVersionIds, testMode); const setupTime = new Date(); await setupFaasFunction( @@ -93,7 +109,7 @@ async function setOpenFaasUserTransform( userTransformation.versionId, libraryVersionIds, userTransformation.imports, - testMode + testMode, ), testMode, ); @@ -107,7 +123,12 @@ async function setOpenFaasUserTransform( * In production mode, the function is executed directly * if function is not found, it is deployed and returns retryable error */ -async function runOpenFaasUserTransform(events, userTransformation, libraryVersionIds, testMode = false) { +async function runOpenFaasUserTransform( + events, + userTransformation, + libraryVersionIds, + testMode = false, +) { if (events.length === 0) { throw new Error('Invalid payload. No events'); } @@ -123,7 +144,13 @@ async function runOpenFaasUserTransform(events, userTransformation, libraryVersi // check and deploy faas function if not exists const functionName = generateFunctionName(userTransformation, libraryVersionIds, testMode); if (testMode) { - await setOpenFaasUserTransform(userTransformation, libraryVersionIds, true, functionName, testMode); + await setOpenFaasUserTransform( + userTransformation, + libraryVersionIds, + true, + functionName, + testMode, + ); } const invokeTime = new Date(); @@ -138,7 +165,7 @@ async function runOpenFaasUserTransform(events, userTransformation, libraryVersi userTransformation.versionId, libraryVersionIds, userTransformation.imports, - testMode + testMode, ), testMode, ); diff --git a/src/util/customTransformer-lambda.js b/src/util/customTransformer-lambda.js index f4a9ca34a5..7a1970cc7a 100644 --- a/src/util/customTransformer-lambda.js +++ b/src/util/customTransformer-lambda.js @@ -1,7 +1,7 @@ -const stats = require('./stats'); const { getMetadata } = require('../v0/util'); const { invokeLambda, setupLambda } = require('./lambda'); const { LOG_DEF_CODE } = require('./lambda/utils'); +const stats = require('./stats'); async function runLambdaUserTransform(events, userTransformation, testMode = false) { if (events.length === 0) { diff --git a/src/util/customTransformer-v1.js b/src/util/customTransformer-v1.js index 329cca140a..fced5b0a21 100644 --- a/src/util/customTransformer-v1.js +++ b/src/util/customTransformer-v1.js @@ -1,9 +1,9 @@ const ivm = require('isolated-vm'); -const stats = require('./stats'); const { getFactory } = require('./ivmFactory'); const { getMetadata } = require('../v0/util'); const logger = require('../logger'); +const stats = require('./stats'); const userTransformTimeout = parseInt(process.env.USER_TRANSFORM_TIMEOUT || '600000', 10); @@ -96,6 +96,7 @@ async function userTransformHandlerV1( stats.timing('run_time', invokeTime, tags); const isolateEndWallTime = calculateMsFromIvmTime(isolatevm.isolate.wallTime); const isolateEndCPUTime = calculateMsFromIvmTime(isolatevm.isolate.cpuTime); + stats.timing('isolate_wall_time', isolateEndWallTime - isolateStartWallTime, tags); stats.timing('isolate_cpu_time', isolateEndCPUTime - isolateStartCPUTime, tags); diff --git a/src/util/customTransformer.js b/src/util/customTransformer.js index a738f0d163..c98aee131c 100644 --- a/src/util/customTransformer.js +++ b/src/util/customTransformer.js @@ -3,9 +3,9 @@ const { compileUserLibrary } = require('../util/ivmFactory'); const fetch = require('node-fetch'); const { getTransformationCode } = require('./customTransforrmationsStore'); const { getTransformationCodeV1 } = require('./customTransforrmationsStore-v1'); -const stats = require('./stats'); const { UserTransformHandlerFactory } = require('./customTransformerFactory'); const { parserForImport } = require('./parser'); +const stats = require('./stats'); const ISOLATE_VM_MEMORY = parseInt(process.env.ISOLATE_VM_MEMORY || '128', 10); diff --git a/src/util/customTransforrmationsStore-v1.js b/src/util/customTransforrmationsStore-v1.js index 894981d889..3263049b6f 100644 --- a/src/util/customTransforrmationsStore-v1.js +++ b/src/util/customTransforrmationsStore-v1.js @@ -1,7 +1,7 @@ const { fetchWithProxy } = require('./fetch'); const logger = require('../logger'); -const stats = require('./stats'); const { responseStatusHandler } = require('./utils'); +const stats = require('./stats'); const transformationCache = {}; const libraryCache = {}; @@ -20,7 +20,7 @@ async function getTransformationCodeV1(versionId) { const transformation = transformationCache[versionId]; if (transformation) return transformation; const tags = { - transformerVersionId: versionId, + versionId, version: 1, }; try { @@ -29,14 +29,14 @@ async function getTransformationCodeV1(versionId) { const response = await fetchWithProxy(url); responseStatusHandler(response.status, 'Transformation', versionId, url); - stats.increment('get_transformation_code.success', tags); - stats.timing('get_transformation_code', startTime, tags); + stats.increment('get_transformation_code', { success: 'true', ...tags }); + stats.timing('get_transformation_code_time', startTime, tags); const myJson = await response.json(); transformationCache[versionId] = myJson; return myJson; } catch (error) { logger.error(error); - stats.increment('get_transformation_code.error', tags); + stats.increment('get_transformation_code', { success: 'false', ...tags }); throw error; } } @@ -54,14 +54,14 @@ async function getLibraryCodeV1(versionId) { const response = await fetchWithProxy(url); responseStatusHandler(response.status, 'Transformation Library', versionId, url); - stats.increment('get_libraries_code.success', tags); - stats.timing('get_libraries_code', startTime, tags); + stats.increment('get_libraries_code', { success: 'true', ...tags }); + stats.timing('get_libraries_code_time', startTime, tags); const myJson = await response.json(); libraryCache[versionId] = myJson; return myJson; } catch (error) { logger.error(error); - stats.increment('get_libraries_code.error', tags); + stats.increment('get_libraries_code', { success: 'false', ...tags }); throw error; } } @@ -81,14 +81,14 @@ async function getRudderLibByImportName(importName) { const response = await fetchWithProxy(url); responseStatusHandler(response.status, 'Rudder Library', importName, url); - stats.increment('get_libraries_code.success', tags); - stats.timing('get_libraries_code', startTime, tags); + stats.increment('get_libraries_code', { success: 'true', ...tags }); + stats.timing('get_libraries_code_time', startTime, tags); const myJson = await response.json(); rudderLibraryCache[importName] = myJson; return myJson; } catch (error) { logger.error(error); - stats.increment('get_libraries_code.error', tags); + stats.increment('get_libraries_code', { success: 'false', ...tags }); throw error; } } diff --git a/src/util/customTransforrmationsStore.js b/src/util/customTransforrmationsStore.js index f5d9995167..46e2b9c9b5 100644 --- a/src/util/customTransforrmationsStore.js +++ b/src/util/customTransforrmationsStore.js @@ -1,8 +1,8 @@ const NodeCache = require('node-cache'); const { fetchWithProxy } = require('./fetch'); const logger = require('../logger'); -const stats = require('./stats'); const { responseStatusHandler } = require('./utils'); +const stats = require('./stats'); const myCache = new NodeCache(); @@ -22,14 +22,14 @@ async function getTransformationCode(versionId) { const response = await fetchWithProxy(url); responseStatusHandler(response.status, 'Transformation', versionId, url); - stats.increment('get_transformation_code.success'); - stats.timing('get_transformation_code', startTime, { versionId }); + stats.increment('get_transformation_code', { versionId, success: 'true' }); + stats.timing('get_transformation_code_time', startTime, { versionId }); const myJson = await response.json(); myCache.set(versionId, myJson); return myJson; } catch (error) { logger.error(error); - stats.increment('get_transformation_code.error', 1, { versionId }); + stats.increment('get_transformation_code', { versionId, success: 'false' }); throw error; } } diff --git a/src/util/ivmFactory.js b/src/util/ivmFactory.js index 93678050fd..acd578cec9 100644 --- a/src/util/ivmFactory.js +++ b/src/util/ivmFactory.js @@ -2,9 +2,9 @@ const ivm = require('isolated-vm'); const fetch = require('node-fetch'); const _ = require('lodash'); -const stats = require('./stats'); const { getLibraryCodeV1, getRudderLibByImportName } = require('./customTransforrmationsStore-v1'); const logger = require('../logger'); +const stats = require('./stats'); const ISOLATE_VM_MEMORY = parseInt(process.env.ISOLATE_VM_MEMORY || '128', 10); const RUDDER_LIBRARY_REGEX = /^@rs\/[A-Za-z]+\/v[0-9]{1,3}$/; @@ -121,7 +121,7 @@ async function createIvm(code, libraryVersionIds, versionId, secrets, testMode) } if (!isObject(transformedOutput)) { return outputEvents.push({error: "returned event from transformEvent(event) is not an object", metadata: eventsMetadata[currMsgId] || {}}); - } + } outputEvents.push({transformedEvent: transformedOutput, metadata: eventsMetadata[currMsgId] || {}}); return; } catch (error) { diff --git a/src/util/lambda/index.js b/src/util/lambda/index.js index 2aba6dbdc6..239ce3d022 100644 --- a/src/util/lambda/index.js +++ b/src/util/lambda/index.js @@ -14,8 +14,8 @@ const { } = require('@aws-sdk/client-lambda'); const { v4: uuidv4 } = require('uuid'); const logger = require('../../logger'); -const stats = require('../stats'); const { isABufferValue, bufferToString, TRANSFORM_WRAPPER_CODE } = require('./utils'); +const stats = require('../stats'); const LAMBDA_MAX_WAIT_TIME = parseInt(process.env.MAX_WAIT_TIME || '30', 10); const LAMBDA_DELAY = parseInt(process.env.DELAY || '2', 10); diff --git a/src/util/prometheus.js b/src/util/prometheus.js new file mode 100644 index 0000000000..1d57912632 --- /dev/null +++ b/src/util/prometheus.js @@ -0,0 +1,660 @@ +const prometheusClient = require('prom-client'); +const logger = require('../logger'); + +const clusterEnabled = process.env.CLUSTER_ENABLED !== 'false'; +const instanceID = process.env.INSTANCE_ID || 'localhost'; +const prefix = 'transformer'; +const defaultLabels = { instanceName: instanceID }; + +function appendPrefix(name) { + return `${prefix}_${name}`; +} + +class Prometheus { + constructor() { + this.prometheusRegistry = new prometheusClient.Registry(); + this.prometheusRegistry.setDefaultLabels(defaultLabels); + prometheusClient.collectDefaultMetrics({ + register: this.prometheusRegistry, + }); + + prometheusClient.AggregatorRegistry.setRegistries(this.prometheusRegistry); + this.aggregatorRegistry = new prometheusClient.AggregatorRegistry(); + + this.createMetrics(); + } + + async metricsController(ctx) { + ctx.status = 200; + if (clusterEnabled) { + ctx.type = this.aggregatorRegistry.contentType; + ctx.body = await this.aggregatorRegistry.clusterMetrics(); + } else { + ctx.type = this.prometheusRegistry.contentType; + ctx.body = await this.prometheusRegistry.metrics(); + } + return ctx.body; + } + + newCounterStat(name, help, labelNames) { + const counter = new prometheusClient.Counter({ + name, + help, + labelNames, + }); + this.prometheusRegistry.registerMetric(counter); + return counter; + } + + newGaugeStat(name, help, labelNames) { + const gauge = new prometheusClient.Gauge({ + name, + help, + labelNames, + }); + this.prometheusRegistry.registerMetric(gauge); + return gauge; + } + + newSummaryStat(name, help, labelNames) { + const summary = new prometheusClient.Summary({ + name, + help, + labelNames, + }); + this.prometheusRegistry.registerMetric(summary); + return summary; + } + + newHistogramStat(name, help, labelNames) { + const histogram = new prometheusClient.Histogram({ + name, + help, + labelNames, + }); + this.prometheusRegistry.registerMetric(histogram); + return histogram; + } + + timing(name, start, tags = {}) { + try { + const metric = this.prometheusRegistry.getSingleMetric(appendPrefix(name)); + if (!metric) { + logger.error(`Prometheus: Timing metric ${name} not found in the registry`); + return; + } + metric.observe(tags, (new Date() - start) / 1000); + } catch (e) { + logger.error(`Prometheus: Timing metric ${name} failed with error ${e}`); + } + } + + histogram(name, value, tags = {}) { + try { + const metric = this.prometheusRegistry.getSingleMetric(appendPrefix(name)); + if (!metric) { + logger.error(`Prometheus: Histogram metric ${name} not found in the registry`); + return; + } + metric.observe(tags, value); + } catch (e) { + logger.error(`Prometheus: Histogram metric ${name} failed with error ${e}`); + } + } + + increment(name, tags = {}) { + this.counter(name, 1, tags); + } + + counter(name, delta, tags = {}) { + try { + const metric = this.prometheusRegistry.getSingleMetric(appendPrefix(name)); + if (!metric) { + logger.error(`Prometheus: Counter metric ${name} not found in the registry`); + return; + } + metric.inc(tags, delta); + } catch (e) { + logger.error(`Prometheus: Counter metric ${name} failed with error ${e}. Value: ${delta}`); + } + } + + gauge(name, value, tags = {}) { + try { + const metric = this.prometheusRegistry.getSingleMetric(appendPrefix(name)); + if (!metric) { + logger.error(`Prometheus: Gauge metric ${name} not found in the registry`); + return; + } + metric.set(tags, value); + } catch (e) { + logger.error(`Prometheus: Gauge metric ${name} failed with error ${e}. Value: ${value}`); + } + } + + createMetrics() { + const metrics = [ + // Counters + { + name: 'user_transform_input_events', + help: 'Number of input events to user transform', + type: 'counter', + labelNames: ['processSessions'], + }, + { + name: 'cdk_live_compare_test_failed', + help: 'cdk_live_compare_test_failed', + type: 'counter', + labelNames: ['destType', 'feature'], + }, + { + name: 'cdk_live_compare_test_success', + help: 'cdk_live_compare_test_success', + type: 'counter', + labelNames: ['destType', 'feature'], + }, + { + name: 'cdk_live_compare_test_errored', + help: 'cdk_live_compare_test_errored', + type: 'counter', + labelNames: ['destType', 'feature'], + }, + { + name: 'hv_violation_type', + help: 'hv_violation_type', + type: 'histogram', + labelNames: ['violationType', 'sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'hv_propagated_events', + help: 'hv_propagated_events', + type: 'counter', + labelNames: ['sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'hv_errors', + help: 'hv_errors', + type: 'counter', + labelNames: ['sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'hv_events_count', + help: 'hv_events_count', + type: 'counter', + labelNames: ['sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'user_transform_function_group_size', + help: 'user_transform_function_group_size', + type: 'counter', + labelNames: ['processSessions'], + }, + { + name: 'user_transform_function_input_events', + help: 'user_transform_function_input_events', + type: 'counter', + labelNames: ['processSessions', 'sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'user_transform_errors', + help: 'user_transform_errors', + type: 'counter', + labelNames: ['l1', 'l2'], + }, + { + name: 'c2', + help: 'h2', + type: 'counter', + labelNames: [ + 'transformationVersionId', + 'processSessions', + 'sourceType', + 'destinationType', + 'k8_namespace', + ], + }, + { + name: 'dest_transform_requests', + help: 'dest_transform_requests', + type: 'counter', + labelNames: ['destination', 'version', 'sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'dest_transform_input_events', + help: 'dest_transform_input_events', + type: 'counter', + labelNames: ['destination', 'version', 'sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'dest_transform_output_events', + help: 'dest_transform_output_events', + type: 'counter', + labelNames: ['destination', 'version', 'sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'user_transform_requests', + help: 'user_transform_requests', + type: 'counter', + labelNames: ['processSessions'], + }, + { + name: 'user_transform_output_events', + help: 'user_transform_output_events', + type: 'counter', + labelNames: ['processSessions'], + }, + { + name: 'source_transform_requests', + help: 'source_transform_requests', + type: 'counter', + labelNames: ['source', 'version'], + }, + { + name: 'source_transform_input_events', + help: 'source_transform_input_events', + type: 'counter', + labelNames: ['source', 'version'], + }, + { + name: 'source_transform_output_events', + help: 'source_transform_output_events', + type: 'counter', + labelNames: ['source', 'version'], + }, + { + name: 'tf_proxy_dest_resp_count', + help: 'tf_proxy_dest_resp_count', + type: 'counter', + labelNames: ['destination', 'success'], + }, + { + name: 'marketo_bulk_upload_upload_file_jobs', + help: 'marketo_bulk_upload_upload_file_jobs', + type: 'counter', + labelNames: ['success'], + }, + { + name: 'create_zip_error', + help: 'create_zip_error', + type: 'counter', + labelNames: ['fileName'], + }, + { + name: 'delete_zip_error', + help: 'delete_zip_error', + type: 'counter', + labelNames: ['functionName'], + }, + { + name: 'hv_metrics', + help: 'hv_metrics', + type: 'counter', + labelNames: [ + 'destination', + 'version', + 'sourceType', + 'destinationType', + 'k8_namespace', + 'dropped', + 'violationType', + ], + }, + { + name: 'events_into_vm', + help: 'events_into_vm', + type: 'counter', + labelNames: [ + 'transformerVersionId', + 'version', + 'sourceType', + 'destinationType', + 'k8_namespace', + ], + }, + { + name: 'missing_handle', + help: 'missing_handle', + type: 'counter', + labelNames: [ + 'transformerVersionId', + 'language', + 'sourceType', + 'destinationType', + 'k8_namespace', + ], + }, + { + name: 'proxy_test_error', + help: 'proxy_test_error', + type: 'counter', + labelNames: ['destination'], + }, + { + name: 'proxy_test_payload_match', + help: 'proxy_test_payload_match', + type: 'counter', + labelNames: ['destination'], + }, + { + name: 'tf_proxy_err_count', + help: 'tf_proxy_err_count', + type: 'counter', + labelNames: ['destination'], + }, + { + name: 'tf_proxy_resp_handler_count', + help: 'tf_proxy_resp_handler_count', + type: 'counter', + labelNames: ['destination'], + }, + { + name: 'tf_proxy_proc_ax_response_count', + help: 'tf_proxy_proc_ax_response_count', + type: 'counter', + labelNames: ['destination'], + }, + { + name: 'tf_proxy_dest_req_count', + help: 'tf_proxy_dest_req_count', + type: 'counter', + labelNames: ['destination'], + }, + { + name: 'source_transform_errors', + help: 'source_transform_errors', + type: 'counter', + labelNames: ['source', 'version'], + }, + { + name: 'marketo_bulk_upload_get_job_status', + help: 'marketo_bulk_upload_get_job_status', + type: 'counter', + labelNames: ['status', 'state'], + }, + { + name: 'marketo_bulk_upload_upload_file', + help: 'marketo_bulk_upload_upload_file', + type: 'counter', + labelNames: ['status', 'state'], + }, + { + name: 'marketo_bulk_upload_polling', + help: 'marketo_bulk_upload_polling', + type: 'counter', + labelNames: ['status', 'state'], + }, + { + name: 'marketo_fetch_token', + help: 'marketo_fetch_token', + type: 'counter', + labelNames: ['status'], + }, + { name: 'marketo_activity', help: 'marketo_activity', type: 'counter', labelNames: [] }, + { + name: 'marketo_lead_lookup', + help: 'marketo_lead_lookup', + type: 'counter', + labelNames: ['type', 'action'], + }, + { + name: 'dest_transform_invalid_dynamicConfig_count', + help: 'dest_transform_invalid_dynamicConfig_count', + type: 'counter', + labelNames: ['destinationType', 'destinationId'], + }, + { + name: 'shopify_client_side_identifier_event', + help: 'shopify_client_side_identifier_event', + type: 'counter', + labelNames: ['writeKey', 'timestamp'], + }, + { + name: 'shopify_server_side_identifier_event', + help: 'shopify_server_side_identifier_event', + type: 'counter', + labelNames: ['writeKey', 'timestamp'], + }, + { + name: 'fb_pixel_timestamp_error', + help: 'fb_pixel_timestamp_error', + type: 'counter', + labelNames: ['destinationId'], + }, + { + name: 'get_eventSchema_error', + help: 'get_eventSchema_error', + type: 'counter', + labelNames: [], + }, + { + name: 'get_tracking_plan_error', + help: 'get_tracking_plan_error', + type: 'counter', + labelNames: [], + }, + + // Gauges + { + name: 'v0_transformation_time', + help: 'v0_transformation_time', + type: 'gauge', + labelNames: ['destType', 'feature'], + }, + { + name: 'cdk_transformation_time', + help: 'cdk_transformation_time', + type: 'gauge', + labelNames: ['destType', 'feature'], + }, + + // Histograms + { + name: 'http_request_duration', + help: 'Summary of HTTP requests duration in seconds', + type: 'histogram', + labelNames: ['method', 'route', 'code'], + }, + { + name: 'hv_request_size', + help: 'hv_request_size', + type: 'histogram', + labelNames: ['sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'hv_request_latency', + help: 'hv_request_latency', + type: 'histogram', + labelNames: ['sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'cdk_events_latency', + help: 'cdk_events_latency', + type: 'histogram', + labelNames: ['destination', 'sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'hv_event_latency', + help: 'hv_event_latency', + type: 'histogram', + labelNames: ['sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'dest_transform_request_latency', + help: 'dest_transform_request_latency', + type: 'histogram', + labelNames: ['destination', 'version', 'sourceType', 'destinationType', 'k8_namespace'], + }, + { + name: 'user_transform_request_latency', + help: 'user_transform_request_latency', + type: 'histogram', + labelNames: ['processSessions'], + }, + { + name: 'user_transform_function_latency', + help: 'user_transform_function_latency', + type: 'histogram', + labelNames: [ + 'transformationVersionId', + 'processSessions', + 'sourceType', + 'destinationType', + 'k8_namespace', + ], + }, + { + name: 'source_transform_request_latency', + help: 'source_transform_request_latency', + type: 'histogram', + labelNames: ['source', 'version'], + }, + { + name: 'transformer_proxy_time', + help: 'transformer_proxy_time', + type: 'histogram', + labelNames: ['destination'], + }, + { + name: 'transformer_total_proxy_latency', + help: 'transformer_total_proxy_latency', + type: 'histogram', + labelNames: ['destination', 'version'], + }, + { + name: 'creation_time', + help: 'creation_time', + type: 'histogram', + labelNames: ['transformerVersionId', 'language', 'identifier', 'publish', 'testMode'], + }, + { + name: 'run_time', + help: 'run_time', + type: 'histogram', + labelNames: [ + 'transformerVersionId', + 'language', + 'identifier', + 'publish', + 'testMode', + 'sourceType', + 'destinationType', + 'k8_namespace', + ], + }, + { name: 'get_tracking_plan', help: 'get_tracking_plan', type: 'histogram', labelNames: [] }, + { name: 'createivm_duration', help: 'createivm_duration', type: 'histogram', labelNames: [] }, + { + name: 'fetchV2_call_duration', + help: 'fetchV2_call_duration', + type: 'histogram', + labelNames: ['versionId'], + }, + { + name: 'fetch_call_duration', + help: 'fetch_call_duration', + type: 'histogram', + labelNames: ['versionId'], + }, + { + name: 'get_transformation_code_time', + help: 'get_transformation_code_time', + type: 'histogram', + labelNames: ['versionId', 'version'], + }, + { + name: 'get_transformation_code', + help: 'get_transformation_code', + type: 'histogram', + labelNames: ['versionId', 'version', 'success'], + }, + { + name: 'get_libraries_code_time', + help: 'get_libraries_code_time', + type: 'histogram', + labelNames: ['libraryVersionId', 'versionId', 'type'], + }, + { + name: 'get_libraries_code', + help: 'get_libraries_code', + type: 'histogram', + labelNames: ['libraryVersionId', 'version', 'type', 'success'], + }, + { + name: 'isolate_cpu_time', + help: 'isolate_cpu_time', + type: 'histogram', + labelNames: [ + 'transformerVersionId', + 'version', + 'sourceType', + 'destinationType', + 'k8_namespace', + ], + }, + { + name: 'isolate_wall_time', + help: 'isolate_wall_time', + type: 'histogram', + labelNames: [ + 'transformerVersionId', + 'version', + 'sourceType', + 'destinationType', + 'k8_namespace', + ], + }, + { + name: 'lambda_test_time', + help: 'lambda_test_time', + type: 'histogram', + labelNames: ['transformerVersionId', 'language', 'publish'], + }, + { + name: 'lambda_invoke_time', + help: 'lambda_invoke_time', + type: 'histogram', + labelNames: [ + 'transformerVersionId', + 'language', + 'sourceType', + 'destinationType', + 'k8_namespace', + ], + }, + { + name: 'marketo_bulk_upload_process_time', + help: 'marketo_bulk_upload_process_time', + type: 'histogram', + labelNames: ['action'], + }, + { + name: 'marketo_bulk_upload_upload_file_size', + help: 'marketo_bulk_upload_upload_file_size', + type: 'histogram', + labelNames: [], + }, + ]; + + metrics.forEach((metric) => { + try { + if (metric.type === 'counter') { + this.newCounterStat(appendPrefix(metric.name), metric.help, metric.labelNames); + } else if (metric.type === 'gauge') { + this.newGaugeStat(appendPrefix(metric.name), metric.help, metric.labelNames); + } else if (metric.type === 'histogram') { + this.newHistogramStat(appendPrefix(metric.name), metric.help, metric.labelNames); + } else { + logger.error( + `Prometheus: Metric creation failed. Name: ${metric.name}. Invalid type: ${metric.type}`, + ); + } + } catch (e) { + logger.error(`Prometheus: Metric creation failed. Name: ${metric.name}. Error ${e}`); + } + }); + } +} + +module.exports = { + Prometheus, +}; diff --git a/src/util/stats.js b/src/util/stats.js index bb0a984f8d..c7adf23bf3 100644 --- a/src/util/stats.js +++ b/src/util/stats.js @@ -1,48 +1,87 @@ -const SDC = require('statsd-client'); +const statsd = require('./statsd'); +const prometheus = require('./prometheus'); +const logger = require('../logger'); const enableStats = process.env.ENABLE_STATS !== 'false'; -const statsServerHost = process.env.STATSD_SERVER_HOST || 'localhost'; -const statsServerPort = parseInt(process.env.STATSD_SERVER_PORT || '8125', 10); -const instanceID = process.env.INSTANCE_ID || 'localhost'; - -const statsdClient = new SDC({ - host: statsServerHost, - port: statsServerPort, - prefix: 'transformer', - tags: { - instanceName: instanceID, - }, -}); +const statsClientType = process.env.STATS_CLIENT || 'statsd'; + +let statsClient; +let dontSendStats; +function init() { + if (!enableStats) { + return; + } + + if (statsClientType === 'statsd') { + statsClient = new statsd.Statsd(); + logger.info('created statsd client'); + } else if (statsClientType === 'prometheus') { + statsClient = new prometheus.Prometheus(); + logger.info('created prometheus client'); + } else { + logger.info("Invalid stats client type. Valid values are 'statsd' and 'prometheus'."); + } + + dontSendStats = !enableStats || !statsClient; +} // Sends the diff between current time and start as the stat const timing = (name, start, tags = {}) => { - if (enableStats) { - statsdClient.timing(name, start, tags); + if (dontSendStats) { + return; } -}; -const increment = (name, delta = 1, tags = {}) => { - if (enableStats) { - statsdClient.increment(name, delta, tags); - } + statsClient.timing(name, start, tags); }; -const decrement = (name, delta = -1, tags = {}) => { - if (enableStats) { - statsdClient.decrement(name, delta, tags); +const increment = (name, tags = {}) => { + if (dontSendStats) { + return; } + + statsClient.increment(name, tags); }; const counter = (name, delta, tags = {}) => { - if (enableStats) { - statsdClient.counter(name, delta, tags); + if (dontSendStats) { + return; } + + statsClient.counter(name, delta, tags); }; const gauge = (name, value, tags = {}) => { - if (enableStats) { - statsdClient.gauge(name, value, tags); + if (dontSendStats) { + return; + } + + statsClient.gauge(name, value, tags); +}; + +const histogram = (name, value, tags = {}) => { + if (dontSendStats) { + return; } + + statsClient.histogram(name, value, tags); }; -module.exports = { timing, increment, decrement, counter, gauge }; +async function metricsController(ctx) { + if (dontSendStats) { + ctx.status = 404; + ctx.body = `Not supported`; + return; + } + + if (statsClientType === 'prometheus') { + await statsClient.metricsController(ctx); + return; + } + + ctx.status = 404; + ctx.body = `Not supported`; +} + +init(); + +module.exports = { init, timing, increment, counter, gauge, histogram, metricsController }; diff --git a/src/util/statsd.js b/src/util/statsd.js new file mode 100644 index 0000000000..a32a6f6f30 --- /dev/null +++ b/src/util/statsd.js @@ -0,0 +1,41 @@ +const SDC = require('statsd-client'); + +const statsServerHost = process.env.STATSD_SERVER_HOST || 'localhost'; +const statsServerPort = parseInt(process.env.STATSD_SERVER_PORT || '8125', 10); +const instanceID = process.env.INSTANCE_ID || 'localhost'; + +class Statsd { + constructor() { + this.statsdClient = new SDC({ + host: statsServerHost, + port: statsServerPort, + prefix: 'transformer', + tags: { + instanceName: instanceID, + }, + }); + } + + // Sends the diff between current time and start as the stat + timing(name, start, tags = {}) { + this.statsdClient.timing(name, start, tags); + } + + increment(name, tags = {}) { + this.statsdClient.increment(name, 1, tags); + } + + counter(name, delta, tags = {}) { + this.statsdClient.counter(name, delta, tags); + } + + gauge(name, value, tags = {}) { + this.statsdClient.gauge(name, value, tags); + } + + histogram(name, value, tags = {}) { + this.statsdClient.histogram(name, value, tags); + } +} + +module.exports = { Statsd }; diff --git a/src/util/trackingPlan.js b/src/util/trackingPlan.js index 1eb81b58d4..0c03820308 100644 --- a/src/util/trackingPlan.js +++ b/src/util/trackingPlan.js @@ -1,8 +1,8 @@ const NodeCache = require('node-cache'); const { fetchWithProxy } = require('./fetch'); const logger = require('../logger'); -const stats = require('./stats'); const { responseStatusHandler } = require('./utils'); +const stats = require('./stats'); const tpCache = new NodeCache(); const CONFIG_BACKEND_URL = process.env.CONFIG_BACKEND_URL || 'https://api.rudderlabs.com'; @@ -28,13 +28,14 @@ async function getTrackingPlan(tpId, version, workspaceId) { const response = await fetchWithProxy(url); responseStatusHandler(response.status, 'Tracking plan', tpId, url); + stats.timing('get_tracking_plan', startTime); const myJson = await response.json(); tpCache.set(`${tpId}::${version}`, myJson); return myJson; } catch (error) { logger.error(`Failed during trackingPlan fetch : ${error}`); - stats.increment('get_tracking_plan.error'); + stats.increment('get_tracking_plan_error'); throw error; } } @@ -71,7 +72,7 @@ async function getEventSchema(tpId, tpVersion, eventType, eventName, workspaceId return eventSchema; } catch (error) { logger.info(`Failed during eventSchema fetch : ${JSON.stringify(error)}`); - stats.increment('get_eventSchema.error'); + stats.increment('get_eventSchema_error'); throw error; } } diff --git a/src/v0/destinations/customerio/data/customerIoGroup.json b/src/v0/destinations/customerio/data/customerIoGroup.json index c812f98922..6d61960372 100644 --- a/src/v0/destinations/customerio/data/customerIoGroup.json +++ b/src/v0/destinations/customerio/data/customerIoGroup.json @@ -3,13 +3,17 @@ "destKey": "object_id", "sourceKeys": "groupId", "sourceFromGenericMap": true, + "metadata": { + "type": "toString" + }, "required": true }, { "destKey": "object_type_id", "sourceKeys": "traits.objectTypeId", "metadata": { - "defaultValue": "1" + "defaultValue": "1", + "type": "toString" }, "required": false }, @@ -17,6 +21,9 @@ "destKey": "userId", "sourceKeys": "userIdOnly", "sourceFromGenericMap": true, + "metadata": { + "type": "toString" + }, "required": false }, { diff --git a/src/v0/destinations/facebook_pixel/networkHandler.js b/src/v0/destinations/facebook_pixel/networkHandler.js index e4541ef991..7af1f0ab36 100644 --- a/src/v0/destinations/facebook_pixel/networkHandler.js +++ b/src/v0/destinations/facebook_pixel/networkHandler.js @@ -125,4 +125,5 @@ const networkHandler = function () { module.exports = { networkHandler, + errorResponseHandler, }; diff --git a/src/v0/destinations/facebook_pixel/transform.js b/src/v0/destinations/facebook_pixel/transform.js index 7d9cfab1f1..f253adc9ef 100644 --- a/src/v0/destinations/facebook_pixel/transform.js +++ b/src/v0/destinations/facebook_pixel/transform.js @@ -437,7 +437,7 @@ const processEvent = (message, destination) => { const deltaMin = Math.ceil(moment.duration(start.diff(current)).asMinutes()); if (deltaDay > 7 || deltaMin > 1) { // TODO: Remove after testing in mirror transformer - stats.increment('fb_pixel_timestamp_error', 1, { + stats.increment('fb_pixel_timestamp_error', { destinationId: destination.ID, }); throw new InstrumentationError( diff --git a/src/v0/destinations/fb/networkHandler.js b/src/v0/destinations/fb/networkHandler.js new file mode 100644 index 0000000000..b79b2ab493 --- /dev/null +++ b/src/v0/destinations/fb/networkHandler.js @@ -0,0 +1,25 @@ +const { processAxiosResponse } = require('../../../adapters/utils/networkUtils'); +const { errorResponseHandler } = require('../facebook_pixel/networkHandler'); +const { prepareProxyRequest, proxyRequest } = require('../../../adapters/network'); + +const destResponseHandler = (destinationResponse) => { + errorResponseHandler(destinationResponse); + return { + destinationResponse: destinationResponse.response, + message: 'Request Processed Successfully', + status: destinationResponse.status, + }; +}; + +class networkHandler { + constructor() { + this.prepareProxyRequest = prepareProxyRequest; + this.proxy = proxyRequest; + this.processAxiosResponse = processAxiosResponse; + this.responseHandler = destResponseHandler; + } +} + +module.exports = { + networkHandler, +}; diff --git a/src/v0/destinations/fb_custom_audience/transform.js b/src/v0/destinations/fb_custom_audience/transform.js index 2d26c55452..4f2bf077a3 100644 --- a/src/v0/destinations/fb_custom_audience/transform.js +++ b/src/v0/destinations/fb_custom_audience/transform.js @@ -13,6 +13,7 @@ const { isDefinedAndNotNull, flattenMap, handleRtTfSingleEventError, + getDestinationExternalIDInfoForRetl, } = require('../../util'); const { @@ -279,6 +280,7 @@ const processEvent = (message, destination) => { const respList = []; const toSendEvents = []; let wrappedResponse = {}; + let { userSchema } = destination.Config; const { isHashRequired, audienceId, maxUserCount } = destination.Config; if (!message.type) { @@ -292,13 +294,16 @@ const processEvent = (message, destination) => { if (message.type.toLowerCase() !== 'audiencelist') { throw new InstrumentationError(` ${message.type} call is not supported `); } - const operationAudienceId = audienceId; - + let operationAudienceId = audienceId; + const mappedToDestination = get(message, MappedToDestinationKey); + if (!operationAudienceId && mappedToDestination) { + const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE'); + operationAudienceId = objectType; + } if (!isDefinedAndNotNullAndNotEmpty(operationAudienceId)) { throw new ConfigurationError('Audience ID is a mandatory field'); } - const mappedToDestination = get(message, MappedToDestinationKey); // If mapped to destination, use the mapped fields instead of destination userschema if (mappedToDestination) { userSchema = getSchemaForEventMappedToDest(message); diff --git a/src/v0/destinations/marketo/transform.js b/src/v0/destinations/marketo/transform.js index 94fc9ae7d6..4dea58df82 100644 --- a/src/v0/destinations/marketo/transform.js +++ b/src/v0/destinations/marketo/transform.js @@ -71,10 +71,10 @@ const getAuthToken = async (formattedDestination) => ); const data = marketoResponseHandler(clientResponse, 'During fetching auth token'); if (data) { - stats.increment(FETCH_TOKEN_METRIC, 1, { status: 'success' }); + stats.increment(FETCH_TOKEN_METRIC, { status: 'success' }); return { value: data.access_token, age: data.expires_in }; } - stats.increment(FETCH_TOKEN_METRIC, 1, { status: 'failed' }); + stats.increment(FETCH_TOKEN_METRIC, { status: 'failed' }); return null; }); @@ -96,7 +96,7 @@ const getAuthToken = async (formattedDestination) => const createOrUpdateLead = async (formattedDestination, token, userId, anonymousId) => userIdLeadCache.get(userId || anonymousId, async () => { const attribute = userId ? { userId } : { anonymousId }; - stats.increment(LEAD_LOOKUP_METRIC, 1, { + stats.increment(LEAD_LOOKUP_METRIC, { type: 'userid', action: 'create', }); @@ -138,7 +138,7 @@ const createOrUpdateLead = async (formattedDestination, token, userId, anonymous // ------------------------ const lookupLeadUsingEmail = async (formattedDestination, token, email) => emailLeadCache.get(email, async () => { - stats.increment(LEAD_LOOKUP_METRIC, 1, { type: 'email', action: 'fetch' }); + stats.increment(LEAD_LOOKUP_METRIC, { type: 'email', action: 'fetch' }); const clientResponse = await sendGetRequest( `https://${formattedDestination.accountId}.mktorest.com/rest/v1/leads.json`, // `https://httpstat.us/200`, @@ -170,7 +170,7 @@ const lookupLeadUsingEmail = async (formattedDestination, token, email) => // ------------------------ const lookupLeadUsingId = async (formattedDestination, token, userId, anonymousId) => userIdLeadCache.get(userId || anonymousId, async () => { - stats.increment(LEAD_LOOKUP_METRIC, 1, { type: 'userId', action: 'fetch' }); + stats.increment(LEAD_LOOKUP_METRIC, { type: 'userId', action: 'fetch' }); const clientResponse = await sendGetRequest( `https://${formattedDestination.accountId}.mktorest.com/rest/v1/leads.json`, { @@ -400,7 +400,7 @@ const processTrack = async (message, formattedDestination, token) => { }; // metric collection - stats.increment(ACTIVITY_METRIC, 1); + stats.increment(ACTIVITY_METRIC); return { endPoint: `https://${accountId}.mktorest.com/rest/v1/activities/external.json`, diff --git a/src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js b/src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js index 4988c64860..0eeecb3aac 100644 --- a/src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js +++ b/src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js @@ -8,13 +8,13 @@ const { JOB_STATUS_ACTIVITY, } = require('./util'); const { httpGET } = require('../../../adapters/network'); -const stats = require('../../../util/stats'); const { AbortedError, RetryableError, ThrottledError, PlatformError, } = require('../../util/errorTypes'); +const stats = require('../../../util/stats'); const getFailedJobStatus = async (event) => { const { config, importId } = event; @@ -33,21 +33,17 @@ const getFailedJobStatus = async (event) => { const resp = await httpGET(failedLeadUrl, requestOptions); const endTime = Date.now(); const requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_fetch_job_time', requestTime, { - integration: 'Marketo_bulk_upload', - }); + + stats.gauge('marketo_bulk_upload_fetch_job_time', requestTime); if (resp.success) { if (resp.response && resp.response.data) { - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 200, state: 'Success', }); return resp.response; } - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', - + stats.increment(JOB_STATUS_ACTIVITY, { status: 400, state: 'Abortable', }); @@ -58,23 +54,20 @@ const getFailedJobStatus = async (event) => { ABORTABLE_CODES.includes(resp.response.code) || (resp.response.code >= 400 && resp.response.code <= 499) ) { - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 400, state: 'Abortable', }); throw new AbortedError(resp.response.code, 400, resp); } else if (RETRYABLE_CODES.includes(resp.response.code)) { - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 500, state: 'Retryable', }); throw new RetryableError(resp.response.code, 500, resp); } else if (resp.response.response) { if (ABORTABLE_CODES.includes(resp.response.response.status)) { - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 400, state: 'Abortable', }); @@ -84,8 +77,7 @@ const getFailedJobStatus = async (event) => { resp, ); } else if (THROTTLED_CODES.includes(resp.response.response.status)) { - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 500, state: 'Retryable', }); @@ -94,8 +86,7 @@ const getFailedJobStatus = async (event) => { resp, ); } - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 500, state: 'Retryable', }); @@ -105,15 +96,13 @@ const getFailedJobStatus = async (event) => { resp, ); } - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 400, state: 'Abortable', }); throw new AbortedError('Could not fetch failure job status', 400, resp); } - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 400, state: 'Abortable', }); @@ -137,20 +126,16 @@ const getWarningJobStatus = async (event) => { const resp = await httpGET(warningJobStatusUrl, requestOptions); const endTime = Date.now(); const requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_fetch_job_time', requestTime, { - integration: 'Marketo_bulk_upload', - }); + stats.gauge('marketo_bulk_upload_fetch_job_time', requestTime); if (resp.success) { if (resp.response && resp.response.data) { - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 200, state: 'Success', }); return resp.response; } - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 400, state: 'Abortable', }); @@ -161,23 +146,20 @@ const getWarningJobStatus = async (event) => { ABORTABLE_CODES.includes(resp.response.code) || (resp.response.code >= 400 && resp.response.code <= 499) ) { - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 400, state: 'Abortable', }); throw new AbortedError(resp.response.code, 400, resp); } else if (RETRYABLE_CODES.includes(resp.response.code)) { - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 500, state: 'Retryable', }); throw new RetryableError(resp.response.code, 500, resp); } else if (resp.response.response) { if (ABORTABLE_CODES.includes(resp.response.response.status)) { - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 400, state: 'Abortable', }); @@ -187,8 +169,7 @@ const getWarningJobStatus = async (event) => { resp, ); } else if (THROTTLED_CODES.includes(resp.response.response.status)) { - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 500, state: 'Retryable', }); @@ -197,8 +178,8 @@ const getWarningJobStatus = async (event) => { resp, ); } - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + + stats.increment(JOB_STATUS_ACTIVITY, { status: 500, state: 'Retryable', }); @@ -208,15 +189,13 @@ const getWarningJobStatus = async (event) => { resp, ); } - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 400, state: 'Abortable', }); throw new AbortedError('Could not fetch warning job status', 400, resp); } - stats.increment(JOB_STATUS_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(JOB_STATUS_ACTIVITY, { status: 400, state: 'Abortable', }); @@ -295,9 +274,7 @@ const responseHandler = async (event, type) => { const succeededKeys = successfulJobIdsArr; const endTime = Date.now(); const requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_fetch_job_create_response_time', requestTime, { - integration: 'Marketo_bulk_upload', - }); + stats.gauge('marketo_bulk_upload_fetch_job_create_response_time', requestTime); const response = { statusCode: 200, metadata: { diff --git a/src/v0/destinations/marketo_bulk_upload/fileUpload.js b/src/v0/destinations/marketo_bulk_upload/fileUpload.js index 3b50359cf7..0841e23e57 100644 --- a/src/v0/destinations/marketo_bulk_upload/fileUpload.js +++ b/src/v0/destinations/marketo_bulk_upload/fileUpload.js @@ -15,7 +15,6 @@ const { isDefinedAndNotNullAndNotEmpty, } = require('../../util'); const { httpPOST, httpGET } = require('../../../adapters/network'); -const stats = require('../../../util/stats'); const { RetryableError, AbortedError, @@ -25,6 +24,7 @@ const { } = require('../../util/errorTypes'); const tags = require('../../util/tags'); const { getDynamicErrorType } = require('../../../adapters/utils/networkUtils'); +const stats = require('../../../util/stats'); const fetchFieldSchema = async (config) => { let fieldArr = []; @@ -150,9 +150,7 @@ const getFileData = async (inputEvents, config, fieldSchemaNames) => { csv.push(headerArr.toString()); endTime = Date.now(); requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_create_header_time', requestTime, { - integration: 'Marketo_bulk_upload', - }); + stats.gauge('marketo_bulk_upload_create_header_time', requestTime); const unsuccessfulJobs = []; const successfulJobs = []; const MARKETO_FILE_PATH = getMarketoFilePath(); @@ -172,9 +170,7 @@ const getFileData = async (inputEvents, config, fieldSchemaNames) => { }); endTime = Date.now(); requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_create_csvloop_time', requestTime, { - integration: 'Marketo_bulk_upload', - }); + stats.gauge('marketo_bulk_upload_create_csvloop_time', requestTime); const fileSize = Buffer.from(csv.join('\n')).length; if (csv.length > 1) { startTime = Date.now(); @@ -183,12 +179,8 @@ const getFileData = async (inputEvents, config, fieldSchemaNames) => { fs.unlinkSync(MARKETO_FILE_PATH); endTime = Date.now(); requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_create_file_time', requestTime, { - integration: 'Marketo_bulk_upload', - }); - stats.gauge('marketo_bulk_upload_upload_file_size', fileSize, { - integration: 'Marketo_bulk_upload', - }); + stats.gauge('marketo_bulk_upload_create_file_time', requestTime); + stats.gauge('marketo_bulk_upload_upload_file_size', fileSize); return { readStream, successfulJobs, unsuccessfulJobs }; } @@ -229,15 +221,11 @@ const getImportID = async (input, config, fieldSchemaNames, accessToken) => { ); const endTime = Date.now(); const requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_upload_file_succJobs', successfulJobs.length, { - integration: 'Marketo_bulk_upload', - }); - stats.gauge('marketo_bulk_upload_upload_file_unsuccJobs', unsuccessfulJobs.length, { - integration: 'Marketo_bulk_upload', - }); + stats.gauge('marketo_bulk_upload_upload_file_succJobs', successfulJobs.length); + stats.gauge('marketo_bulk_upload_upload_file_unsuccJobs', unsuccessfulJobs.length); if (resp.success) { /** - * + * { "requestId": "d01f#15d672f8560", "result": [ @@ -258,11 +246,9 @@ const getImportID = async (input, config, fieldSchemaNames, accessToken) => { resp.response.data.result[0].importId ) { const { importId } = await resp.response.data.result[0]; - stats.gauge('marketo_bulk_upload_upload_file_time', requestTime, { - integration: 'Marketo_bulk_upload', - }); - stats.increment(UPLOAD_FILE, 1, { - integration: 'Marketo_bulk_upload', + stats.gauge('marketo_bulk_upload_upload_file_time', requestTime); + + stats.increment(UPLOAD_FILE, { status: 200, state: 'Success', }); @@ -274,8 +260,7 @@ const getImportID = async (input, config, fieldSchemaNames, accessToken) => { resp.response.data.errors[0].message === 'There are 10 imports currently being processed. Please try again later' ) { - stats.increment(UPLOAD_FILE, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(UPLOAD_FILE, { status: 500, state: 'Retryable', }); @@ -292,8 +277,7 @@ const getImportID = async (input, config, fieldSchemaNames, accessToken) => { ABORTABLE_CODES.indexOf(resp.response.data.errors[0].code)) ) { if (resp.response.data.errors[0].message === 'Empty file') { - stats.increment(UPLOAD_FILE, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(UPLOAD_FILE, { status: 500, state: 'Retryable', }); @@ -303,8 +287,8 @@ const getImportID = async (input, config, fieldSchemaNames, accessToken) => { { successfulJobs, unsuccessfulJobs }, ); } - stats.increment(UPLOAD_FILE, 1, { - integration: 'Marketo_bulk_upload', + + stats.increment(UPLOAD_FILE, { status: 400, state: 'Abortable', }); @@ -314,8 +298,7 @@ const getImportID = async (input, config, fieldSchemaNames, accessToken) => { { successfulJobs, unsuccessfulJobs }, ); } else if (THROTTLED_CODES.indexOf(resp.response.data.errors[0].code)) { - stats.increment(UPLOAD_FILE, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(UPLOAD_FILE, { status: 500, state: 'Retryable', }); @@ -324,8 +307,7 @@ const getImportID = async (input, config, fieldSchemaNames, accessToken) => { unsuccessfulJobs, }); } - stats.increment(UPLOAD_FILE, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(UPLOAD_FILE, { status: 500, state: 'Retryable', }); @@ -339,8 +321,8 @@ const getImportID = async (input, config, fieldSchemaNames, accessToken) => { } return { successfulJobs, unsuccessfulJobs }; } catch (err) { - stats.increment(UPLOAD_FILE, 1, { - integration: 'Marketo_bulk_upload', + // TODO check the tags + stats.increment(UPLOAD_FILE, { status: err.response?.status || 400, errorMessage: err.message || 'Error during uploading file', }); @@ -379,8 +361,8 @@ const responseHandler = async (input, config) => { response.metadata = { successfulJobs, unsuccessfulJobs, csvHeader }; return response; } - stats.increment(UPLOAD_FILE, 1, { - integration: 'Marketo_bulk_upload', + + stats.increment(UPLOAD_FILE, { status: 500, state: 'Retryable', }); diff --git a/src/v0/destinations/marketo_bulk_upload/poll.js b/src/v0/destinations/marketo_bulk_upload/poll.js index 405de6c200..6111d3a76f 100644 --- a/src/v0/destinations/marketo_bulk_upload/poll.js +++ b/src/v0/destinations/marketo_bulk_upload/poll.js @@ -23,8 +23,7 @@ const getPollStatus = async (event) => { const requestTime = endTime - startTime; if (pollStatus.success) { if (pollStatus.response && pollStatus.response.data.success) { - stats.increment(POLL_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(POLL_ACTIVITY, { requestTime, status: 200, state: 'Success', @@ -53,8 +52,7 @@ const getPollStatus = async (event) => { pollStatus.response.data.errors[0].code <= 1077) || ABORTABLE_CODES.includes(pollStatus.response.data.errors[0].code)) ) { - stats.increment(POLL_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(POLL_ACTIVITY, { requestTime, status: 400, state: 'Abortable', @@ -65,8 +63,7 @@ const getPollStatus = async (event) => { pollStatus, ); } else if (THROTTLED_CODES.includes(pollStatus.response.data.errors[0].code)) { - stats.increment(POLL_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(POLL_ACTIVITY, { requestTime, status: 500, state: 'Retryable', @@ -76,8 +73,7 @@ const getPollStatus = async (event) => { pollStatus, ); } - stats.increment(POLL_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(POLL_ACTIVITY, { requestTime, status: 500, state: 'Retryable', @@ -89,8 +85,7 @@ const getPollStatus = async (event) => { ); } } - stats.increment(POLL_ACTIVITY, 1, { - integration: 'Marketo_bulk_upload', + stats.increment(POLL_ACTIVITY, { requestTime, status: 400, state: 'Abortable', @@ -110,7 +105,7 @@ const responseHandler = async (event) => { let errorResponse; // Server expects : /** - * + * * { "success": true, "statusCode": 200, @@ -118,7 +113,7 @@ const responseHandler = async (event) => { "failedJobsURL": "", // transformer URL "hasWarnings": false, "warningJobsURL": "", // transformer URL - } // Succesful Upload + } // Succesful Upload { "success": false, "statusCode": 400, diff --git a/src/v0/sources/shopify/transform.js b/src/v0/sources/shopify/transform.js index cdc47a6439..0caa34cc20 100644 --- a/src/v0/sources/shopify/transform.js +++ b/src/v0/sources/shopify/transform.js @@ -1,6 +1,5 @@ const _ = require('lodash'); const get = require('get-value'); -const stats = require('../../../util/stats'); const { getShopifyTopic, createPropertiesForEcomEvent, @@ -11,6 +10,7 @@ const { const { removeUndefinedAndNullValues } = require('../../util'); const Message = require('../message'); const { EventType } = require('../../../constants'); +const stats = require('../../../util/stats'); const { INTEGERATION, MAPPING_CATEGORIES, @@ -161,7 +161,7 @@ const processEvent = (inputEvent) => { } message = removeUndefinedAndNullValues(message); - stats.increment('shopify_server_side_identifier_event', 1, { + stats.increment('shopify_server_side_identifier_event', { writeKey: inputEvent.query_parameters?.writeKey?.[0], timestamp: Date.now(), }); @@ -169,7 +169,7 @@ const processEvent = (inputEvent) => { }; const isIdentifierEvent = (event) => { if (event?.event === 'rudderIdentifier') { - stats.increment('shopify_client_side_identifier_event', 1, { + stats.increment('shopify_client_side_identifier_event', { writeKey: event.query_parameters?.writeKey?.[0], timestamp: Date.now(), }); diff --git a/src/v0/util/index.js b/src/v0/util/index.js index 1147ca86da..c4a438ad4e 100644 --- a/src/v0/util/index.js +++ b/src/v0/util/index.js @@ -1467,7 +1467,7 @@ function getValidDynamicFormConfig( (element[keyRight] || element[keyRight] === ''), ); if (res.length < attributeArray.length) { - stats.increment('dest_transform_invalid_dynamicConfig_count', 1, { + stats.increment('dest_transform_invalid_dynamicConfig_count', { destinationType, destinationId, }); diff --git a/src/versionedRouter.js b/src/versionedRouter.js index 7994db0587..1e69d04dc2 100644 --- a/src/versionedRouter.js +++ b/src/versionedRouter.js @@ -27,7 +27,6 @@ const networkHandlerFactory = require('./adapters/networkHandlerFactory'); const profilingRouter = require('./routes/profiling'); const destProxyRoutes = require('./routes/destinationProxy'); const eventValidator = require('./util/eventValidation'); -const { prometheusRegistry } = require('./middleware'); const { getIntegrations } = require('./routes/utils'); const { setupUserTransformHandler, validateCode } = require('./util/customTransformer'); const { CommonUtils } = require('./util/common'); @@ -128,6 +127,7 @@ async function compareWithCdkV2(destType, inputArr, feature, v0Result, v0Time) { const cdkResult = await getCdkV2Result(destType, inputArr[0], feature); const diff = process.hrtime(startTime); const cdkTime = diff[0] * NS_PER_SEC + diff[1]; + stats.gauge('v0_transformation_time', v0Time, { destType, feature, @@ -231,7 +231,7 @@ async function handleDest(ctx, version, destination) { const metaTags = events && events.length > 0 && events[0].metadata ? getMetadata(events[0].metadata) : {}; - stats.increment('dest_transform_input_events', events.length, { + stats.counter('dest_transform_input_events', events.length, { destination, version, ...metaTags, @@ -325,7 +325,7 @@ async function handleDest(ctx, version, destination) { ...metaTags, }); logger.debug(`[DT] Output events: ${JSON.stringify(respList)}`); - stats.increment('dest_transform_output_events', respList.length, { + stats.counter('dest_transform_output_events', respList.length, { destination, version, ...metaTags, @@ -408,7 +408,7 @@ async function handleValidation(ctx) { stats.counter('hv_events_count', events.length, { ...metaTags, }); - stats.counter('hv_request_size', requestSize, { + stats.histogram('hv_request_size', requestSize, { ...metaTags, }); stats.timing('hv_request_latency', requestStartTime, { @@ -561,12 +561,13 @@ if (startDestTransformer) { ctx.request.body && ctx.request.body.length > 0 && ctx.request.body[0].metadata ? getMetadata(ctx.request.body[0].metadata) : {}; + stats.timing('dest_transform_request_latency', startTime, { destination, version, ...metaTags, }); - stats.increment('dest_transform_requests', 1, { + stats.increment('dest_transform_requests', { destination, version, ...metaTags, @@ -583,11 +584,12 @@ if (startDestTransformer) { ctx.request.body && ctx.request.body.length > 0 && ctx.request.body[0].metadata ? getMetadata(ctx.request.body[0].metadata) : {}; + stats.timing('dest_transform_request_latency', startTime, { destination, ...metaTags, }); - stats.increment('dest_transform_requests', 1, { + stats.increment('dest_transform_requests', { destination, version, ...metaTags, @@ -769,10 +771,11 @@ if (startDestTransformer) { ctx.body = transformedEvents; ctx.status = ctxStatusCode; ctx.set('apiVersion', API_VERSION); + stats.timing('user_transform_request_latency', startTime, { processSessions, }); - stats.counter('user_transform_requests', 1, { processSessions }); + stats.increment('user_transform_requests', { processSessions }); stats.counter('user_transform_output_events', transformedEvents.length, { processSessions, }); @@ -867,7 +870,7 @@ async function handleSource(ctx, version, source) { const sourceHandler = getSourceHandler(version, source); const events = ctx.request.body; logger.debug(`[ST] Input source events: ${JSON.stringify(events)}`); - stats.increment('source_transform_input_events', events.length, { + stats.counter('source_transform_input_events', events.length, { source, version, }); @@ -914,6 +917,7 @@ async function handleSource(ctx, version, source) { }; respList.push(resp); + stats.counter('source_transform_errors', events.length, { source, version, @@ -943,11 +947,12 @@ if (startSourceTransformer) { router.post(`/${version}/sources/${source}`, async (ctx) => { const startTime = new Date(); await handleSource(ctx, version, source); + stats.timing('source_transform_request_latency', startTime, { source, version, }); - stats.increment('source_transform_requests', 1, { source, version }); + stats.increment('source_transform_requests', { source, version }); }); }); }); @@ -972,6 +977,7 @@ async function handleProxyRequest(destination, ctx) { }); const startTime = new Date(); const rawProxyResponse = await destNetworkHandler.proxy(destinationRequest); + stats.timing('transformer_proxy_time', startTime, { destination, }); @@ -1039,6 +1045,7 @@ if (transformerProxy) { const startTime = new Date(); ctx.set('apiVersion', API_VERSION); await handleProxyRequest(destination, ctx); + stats.timing('transformer_total_proxy_latency', startTime, { destination, version, @@ -1340,12 +1347,6 @@ const handleDeletionOfUsers = async (ctx) => { return ctx.body; // const { destType } = ctx.request.body; }; -const metricsController = async (ctx) => { - ctx.status = 200; - ctx.type = prometheusRegistry.contentType; - ctx.body = await prometheusRegistry.metrics(); - return ctx.body; -}; router.post('/fileUpload', async (ctx) => { await fileUpload(ctx); @@ -1387,10 +1388,6 @@ router.post(`/deleteUsers`, async (ctx) => { await handleDeletionOfUsers(ctx); }); -router.get('/metrics', async (ctx) => { - await metricsController(ctx); -}); - module.exports = { router, handleDest, diff --git a/test/__mocks__/data/facebook_pixel/proxy_response.json b/test/__mocks__/data/facebook_pixel/proxy_response.json index 1558d47b51..6cf05a2789 100644 --- a/test/__mocks__/data/facebook_pixel/proxy_response.json +++ b/test/__mocks__/data/facebook_pixel/proxy_response.json @@ -50,7 +50,7 @@ "status": 500 } }, - "https://graph.facebook.com/v16.0/1234567891234569/events?access_token=invalid_account_id_valid_access_token": { + "https://graph.facebook.com/v16.0/1234567891234567/events?access_token=invalid_account_id_valid_access_token": { "response": { "data": { "error": { diff --git a/test/__mocks__/data/fb/proxy_response.json b/test/__mocks__/data/fb/proxy_response.json new file mode 100644 index 0000000000..f4ba661b41 --- /dev/null +++ b/test/__mocks__/data/fb/proxy_response.json @@ -0,0 +1,67 @@ +{ + "https://graph.facebook.com/v16.0/RudderFbApp/activities?access_token=my_access_token": { + "data": { + "events_received": 1, + "fbtrace_id": "facebook_trace_id" + }, + "status": 200, + "statusText": "OK" + }, + "https://graph.facebook.com/v16.0/RudderFbApp/activities?access_token=invalid_access_token": { + "response": { + "data": { + "error": { + "message": "The access token could not be decrypted", + "type": "OAuthException", + "code": 190, + "fbtrace_id": "fbpixel_trace_id" + } + }, + "status": 500 + } + }, + "https://graph.facebook.com/v16.0/RudderFbApp/activities?access_token=invalid_timestamp_correct_access_token": { + "response": { + "data": { + "error": { + "message": "Invalid parameter", + "type": "OAuthException", + "code": 100, + "error_subcode": 2804003, + "is_transient": false, + "error_user_title": "Event Timestamp Too Old", + "error_user_msg": "The timestamp for this event is too far in the past. Events need to be sent from your server within 7 days of when they occurred. Enter a timestamp that has occurred within the last 7 days.", + "fbtrace_id": "A6UyEgg_HdoiRX9duxcBOjb" + } + }, + "status": 400 + } + }, + "https://graph.facebook.com/v16.0/RudderFbApp/activities?access_token=throttled_valid_access_token": { + "response": { + "data": { + "error": { + "message": "User request limit reached", + "type": "OAuthException", + "code": 17, + "fbtrace_id": "facebook_px_trace_id_4" + } + }, + "status": 500 + } + }, + "https://graph.facebook.com/v16.0/RudderFbApp/activities?access_token=invalid_account_id_valid_access_token": { + "response": { + "data": { + "error": { + "message": "Unsupported post request. Object with ID '1234567891234569' does not exist, cannot be loaded due to missing permissions, or does not support this operation. Please read the Graph API documentation at https://developers.facebook.com/docs/graph-api", + "type": "GraphMethodException", + "code": 100, + "error_subcode": 33, + "fbtrace_id": "facebook_px_trace_id_5" + } + }, + "status": 400 + } + } +} diff --git a/test/__mocks__/network.js b/test/__mocks__/network.js index b8bc753bb6..4d7b148747 100644 --- a/test/__mocks__/network.js +++ b/test/__mocks__/network.js @@ -15,7 +15,8 @@ const urlDirectoryMap = { "bigquery.googleapis.com": "bqstream", "pi.pardot.com": "pardot", "googleads.googleapis.com": "google_adwords_remarketing_lists", - "graph.facebook.com": "facebook_pixel", + "graph.facebook.com/v16.0/1234567891234567": "facebook_pixel", + "graph.facebook.com/v16.0/RudderFbApp": "fb", "api.wootric.com": "wootric", "api.mautic.com": "mautic", "adsapi.snapchat.com": "snapchat_custom_audience", diff --git a/test/__tests__/data/customerio_input.json b/test/__tests__/data/customerio_input.json index f6dddc425e..30cbcf0f95 100644 --- a/test/__tests__/data/customerio_input.json +++ b/test/__tests__/data/customerio_input.json @@ -4153,5 +4153,115 @@ "Transformations": [], "IsProcessorEnabled": true } - } + }, + { + "description": "successful group call with userId and groupId as an integer", + "message": { + "type": "group", + "header": { + "content-type": "application/json; charset=utf-8" + }, + "sentAt": "2023-03-28T09:36:49.882Z", + "traits": { + "city": "Frankfurt", + "name": "rudder test", + "state": "Hessen", + "isFake": true, + "address": "Solmsstraße 83", + "country": "DE", + "website": "http://www.rudderstack.com", + "industry": "Waste and recycling", + "postcode": "60486", + "whiteLabel": "rudderlabs", + "maxNbJobBoards": 2, + "organisationId": 306, + "pricingPackage": "packageExpert", + "dateProTrialEnd": "2022-08-31T00:00:00+00:00", + "isProTrialActive": true, + "datetimeRegistration": "2020-07-01T10:23:41+00:00", + "isPersonnelServiceProvider": false + }, + "userId": 432, + "channel": "server", + "context": { + "library": { + "name": "rudder-analytics-php", + "version": "2.0.1", + "consumer": "LibCurl" + } + }, + "groupId": 306, + "rudderId": "f5b46a12-2dab-4e24-a127-7316eed414fc", + "messageId": "7032394c-e813-4737-bf52-622dbcefe849", + "timestamp": "2020-07-01T10:23:41.000Z", + "receivedAt": "2023-03-28T09:36:48.296Z", + "request_ip": "18.195.235.225", + "originalTimestamp": "2023-03-28T09:36:49.882Z" + }, + "destination": { + "ID": "23Mi76khsFhY7bh9ZyRcvR3pHDt", + "Name": "Customer IO Dev", + "DestinationDefinition": { + "ID": "23MgSlHXsPLsiH7SbW7IzCP32fn", + "Name": "CUSTOMERIO", + "DisplayName": "Customer IO", + "Config": { + "destConfig": { + "defaultConfig": [ + "apiKey", + "siteID", + "datacenterEU", + "deviceTokenEventName" + ], + "web": [ + "useNativeSDK", + "blackListedEvents", + "whiteListedEvents" + ] + }, + "excludeKeys": [], + "includeKeys": [ + "apiKey", + "siteID", + "datacenterEU", + "blackListedEvents", + "whiteListedEvents" + ], + "saveDestinationResponse": true, + "secretKeys": [], + "supportedMessageTypes": [ + "identify", + "page", + "screen", + "track" + ], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter", + "cordova" + ], + "supportsVisualMapper": true, + "transformAt": "processor", + "transformAtV1": "processor" + }, + "ResponseRules": {} + }, + "Config": { + "apiKey": "ef32c3f60fb98f39ef35", + "datacenterEU": false, + "deviceTokenEventName": "device_token_registered", + "siteID": "c0efdbd20b9fbe24a7e2" + }, + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } + } ] \ No newline at end of file diff --git a/test/__tests__/data/customerio_output.json b/test/__tests__/data/customerio_output.json index 56ecbd1088..e1d065567a 100644 --- a/test/__tests__/data/customerio_output.json +++ b/test/__tests__/data/customerio_output.json @@ -1293,5 +1293,57 @@ "files": {}, "userId": "user@1", "statusCode": 200 + }, + { + "version": "1", + "type": "REST", + "method": "POST", + "endpoint": "https://track.customer.io/api/v2/batch", + "headers": { + "Authorization": "Basic YzBlZmRiZDIwYjlmYmUyNGE3ZTI6ZWYzMmMzZjYwZmI5OGYzOWVmMzU=" + }, + "params": {}, + "body": { + "XML": {}, + "FORM": {}, + "JSON": { + "type": "object", + "action": "identify", + "attributes": { + "city": "Frankfurt", + "name": "rudder test", + "state": "Hessen", + "isFake": true, + "address": "Solmsstraße 83", + "country": "DE", + "website": "http://www.rudderstack.com", + "industry": "Waste and recycling", + "postcode": "60486", + "whiteLabel": "rudderlabs", + "maxNbJobBoards": 2, + "organisationId": 306, + "pricingPackage": "packageExpert", + "dateProTrialEnd": "2022-08-31T00:00:00+00:00", + "isProTrialActive": true, + "datetimeRegistration": "2020-07-01T10:23:41+00:00", + "isPersonnelServiceProvider": false + }, + "identifiers": { + "object_id": "306", + "object_type_id": "1" + }, + "cio_relationships": [ + { + "identifiers": { + "id": "432" + } + } + ] + }, + "JSON_ARRAY": {} + }, + "files": {}, + "userId": 432, + "statusCode": 200 } ] \ No newline at end of file diff --git a/test/__tests__/data/facebook_pixel_proxy_input.json b/test/__tests__/data/facebook_pixel_proxy_input.json index e8d2cbc4be..5db2d29b5b 100644 --- a/test/__tests__/data/facebook_pixel_proxy_input.json +++ b/test/__tests__/data/facebook_pixel_proxy_input.json @@ -115,7 +115,7 @@ "userId": "", "headers": {}, "version": "1", - "endpoint": "https://graph.facebook.com/v16.0/1234567891234569/events?access_token=invalid_account_id_valid_access_token" + "endpoint": "https://graph.facebook.com/v16.0/1234567891234567/events?access_token=invalid_account_id_valid_access_token" } } } diff --git a/test/__tests__/data/fb_custom_audience_router_rETL_input.json b/test/__tests__/data/fb_custom_audience_router_rETL_input.json new file mode 100644 index 0000000000..c1bc5ac224 --- /dev/null +++ b/test/__tests__/data/fb_custom_audience_router_rETL_input.json @@ -0,0 +1,112 @@ +[ + { + "message": { + "sentAt" : "2023-03-30 06:42:55.991938402 +0000 UTC", + "userId" : "2MUWghI7u85n91dd1qzGyswpZan-2MUWqbQqvctyfMGqU9QCNadpKNy", + "channel" : "sources", + "messageId" : "4d906837-031d-4d34-b97a-62fdf51b4d3a", + "event" : "Add_Audience", + "context" : { + "destinationFields" : "EMAIL, FN", + "externalId" : [ + { + "type" : "FB_CUSTOM_AUDIENCE-23848494844100489", + "identifierType" : "EMAIL" + } + ], + "mappedToDestination" : "true", + "sources" : { + "job_run_id" : "cgiiurt8um7k7n5dq480", + "task_run_id" : "cgiiurt8um7k7n5dq48g", + "job_id" : "2MUWghI7u85n91dd1qzGyswpZan", + "version" : "895\/merge" + } + }, + "recordId" : "725ad989-6750-4839-b46b-0ddb3b8e5aa2\/1\/10", + "rudderId" : "85c49666-c628-4835-937b-8f1d9ee7a724", + "properties" : { + "listData" : { + "add" : [ + { + "EMAIL" : "dede@gmail.com", + "FN" : "vishwa" + }, + { + "EMAIL" : "fchsjjn@gmail.com", + "FN" : "hskks" + }, + { + "EMAIL" : "fghjnbjk@gmail.com", + "FN" : "ghfry" + }, + { + "EMAIL" : "gvhjkk@gmail.com", + "FN" : "hbcwqe" + }, + { + "EMAIL" : "qsdwert@egf.com", + "FN" : "dsfds" + }, + { + "EMAIL" : "ascscxsaca@com", + "FN" : "scadscdvcda" + }, + { + "EMAIL" : "abc@gmail.com", + "FN" : "subscribed" + }, + { + "EMAIL" : "ddwnkl@gmail.com", + "FN" : "subscribed" + }, + { + "EMAIL" : "subscribed@eewrfrd.com", + "FN" : "pending" + }, + { + "EMAIL" : "acsdvdf@ddfvf.com", + "FN" : "pending" + } + ] + } + }, + "type" : "audienceList", + "anonymousId" : "63228b51-394e-4ca2-97a0-427f6187480b" + }, + "destination": { + "Config": { + "accessToken": "ABC", + "disableFormat": false, + "isHashRequired": true, + "isRaw": false, + "maxUserCount": "50", + "oneTrustCookieCategories": [], + "skipVerify": false, + "subType": "NA", + "type": "NA", + "userSchema": [ "EMAIL" ] + }, + "secretConfig": {}, + "ID": "1mMy5cqbtfuaKZv1IhVQKnBdVwe", + "name": "FB_CUSTOM_AUDIENCE", + "enabled": true, + "workspaceId": "1TSN08muJTZwH8iCDmnnRt1pmLd", + "deleted": false, + "createdAt": "2020-12-30T08:39:32.005Z", + "updatedAt": "2021-02-03T16:22:31.374Z", + "destinationDefinition": { + "id": "1aIXqM806xAVm92nx07YwKbRrO9", + "name": "FB_CUSTOM_AUDIENCE", + "displayName": "FB_CUSTOM_AUDIENCE", + "createdAt": "2020-04-09T09:24:31.794Z", + "updatedAt": "2021-01-11T11:03:28.103Z" + }, + "transformations": [], + "isConnectionEnabled": true, + "isProcessorEnabled": true + }, + "metadata": { + "jobId": 2 + } + } +] \ No newline at end of file diff --git a/test/__tests__/data/fb_custom_audience_router_rETL_output.json b/test/__tests__/data/fb_custom_audience_router_rETL_output.json new file mode 100644 index 0000000000..0a0c86f1a7 --- /dev/null +++ b/test/__tests__/data/fb_custom_audience_router_rETL_output.json @@ -0,0 +1,110 @@ +[ + { + "batchedRequest": { + "version": "1", + "type": "REST", + "method": "POST", + "endpoint": "https://graph.facebook.com/v16.0/23848494844100489/users", + "headers": {}, + "params": { + "access_token": "ABC", + "payload": { + "schema": [ + "EMAIL", + "FN" + ], + "data": [ + [ + "7625cab24612c37df6d2f724721bb38a25095d0295e29b807238ee188b8aca43", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ], + [ + "b2b4abadd72190af54305c0d3abf1977fec4935016bb13ff28040d5712318dfd", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ], + [ + "c4b007d1c3c9a5d31bd4082237a913e8e0db1767225c2a5ef33be2716df005fa", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ], + [ + "94639be1bd9f17c05820164e9d71ef78558f117a9e8bfab43cf8015e08aa0b27", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ], + [ + "39b456cfb4bb07f9e6bb18698aa173171ca49c731fccc4790e9ecea808d24ae6", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ], + [ + "769f73387add781a481ca08300008a08fb2f1816aaed196137efc2e05976d711", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ], + [ + "48ddb93f0b30c475423fe177832912c5bcdce3cc72872f8051627967ef278e08", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ], + [ + "da2d431121cd10578fd81f8f80344b06db59ea2d05a7b5d27536c8789ddae8f0", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ], + [ + "b100c2ec0718fe6b4805b623aeec6710719d042ceea55f5c8135b010ec1c7b36", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ], + [ + "0c1d1b0ba547a742013366d6fbc8f71dd77f566d94e41ed9f828a74b96928161", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ] + ] + } + }, + "body": { + "JSON": {}, + "JSON_ARRAY": {}, + "XML": {}, + "FORM": {} + }, + "files": {} + }, + "metadata": [ + { + "jobId": 2 + } + ], + "batched": false, + "statusCode": 200, + "destination": { + "Config": { + "accessToken": "ABC", + "disableFormat": false, + "isHashRequired": true, + "isRaw": false, + "maxUserCount": "50", + "oneTrustCookieCategories": [], + "skipVerify": false, + "subType": "NA", + "type": "NA", + "userSchema": [ + "EMAIL" + ] + }, + "secretConfig": {}, + "ID": "1mMy5cqbtfuaKZv1IhVQKnBdVwe", + "name": "FB_CUSTOM_AUDIENCE", + "enabled": true, + "workspaceId": "1TSN08muJTZwH8iCDmnnRt1pmLd", + "deleted": false, + "createdAt": "2020-12-30T08:39:32.005Z", + "updatedAt": "2021-02-03T16:22:31.374Z", + "destinationDefinition": { + "id": "1aIXqM806xAVm92nx07YwKbRrO9", + "name": "FB_CUSTOM_AUDIENCE", + "displayName": "FB_CUSTOM_AUDIENCE", + "createdAt": "2020-04-09T09:24:31.794Z", + "updatedAt": "2021-01-11T11:03:28.103Z" + }, + "transformations": [], + "isConnectionEnabled": true, + "isProcessorEnabled": true + } + } +] \ No newline at end of file diff --git a/test/__tests__/data/fb_proxy_input.json b/test/__tests__/data/fb_proxy_input.json new file mode 100644 index 0000000000..e4043b46bb --- /dev/null +++ b/test/__tests__/data/fb_proxy_input.json @@ -0,0 +1,166 @@ +[ + { + "request": { + "body": { + "body": { + "XML": {}, + "JSON_ARRAY": {}, + "FORM": { + "event": "CUSTOM_APP_EVENTS", + "advertiser_id": "df16bffa-5c3d-4fbb-9bce-3bab098129a7R", + "ud[em]": "48ddb93f0b30c475423fe177832912c5bcdce3cc72872f8051627967ef278e08", + "ud[fn]": "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08", + "ud[ge]": "62c66a7a5dd70c3146618063c344e531e6d4b59e379808443ce962b3abd63c5a", + "ud[ln]": "3547cb112ac4489af2310c0626cdba6f3097a2ad5a3b42ddd3b59c76c7a079a3", + "ud[ph]": "588211a01b10feacbf7988d97a06e86c18af5259a7f457fd8759b7f7409a7d1f", + "extinfo": "[\"a2\",\"\",\"\",\"\",\"8.1.0\",\"Redmi 6\",\"\",\"\",\"Banglalink\",640,480,\"1.23\",0,0,0,\"Europe/Berlin\"]", + "app_user_id": "c82cbdff-e5be-4009-ac78-cdeea09ab4b1", + "custom_events": "[{\"_logTime\":1567333011693,\"_eventName\":\"spin_result\",\"_valueToSum\":400,\"fb_currency\":\"GBP\",\"additional_bet_index\":0,\"battle_id\":\"N/A\",\"bet_amount\":9,\"bet_level\":1,\"bet_multiplier\":1,\"coin_balance\":9466052,\"current_module_name\":\"CasinoGameModule\",\"days_in_game\":0,\"extra_param\":\"N/A\",\"fb_profile\":\"0\",\"featureGameType\":\"N/A\",\"game_fps\":30,\"game_id\":\"fireEagleBase\",\"game_name\":\"FireEagleSlots\",\"gem_balance\":0,\"graphicsQuality\":\"HD\",\"idfa\":\"2bf99787-33d2-4ae2-a76a-c49672f97252\",\"internetReachability\":\"ReachableViaLocalAreaNetwork\",\"isLowEndDevice\":\"False\",\"is_auto_spin\":\"False\",\"is_turbo\":\"False\",\"isf\":\"False\",\"ishighroller\":\"False\",\"jackpot_win_amount\":90,\"jackpot_win_type\":\"Silver\",\"level\":6,\"lifetime_gem_balance\":0,\"no_of_spin\":1,\"player_total_battles\":0,\"player_total_shields\":0,\"start_date\":\"2019-08-01\",\"total_payments\":0,\"tournament_id\":\"T1561970819\",\"userId\":\"c82cbdff-e5be-4009-ac78-cdeea09ab4b1\",\"versionSessionCount\":2,\"win_amount\":0,\"fb_content_id\":[\"123\",\"345\",\"567\"]}]", + "advertiser_tracking_enabled": "0", + "application_tracking_enabled": "0" + }, + "JSON": {} + }, + "endpoint": "https://graph.facebook.com/v16.0/RudderFbApp/activities?access_token=invalid_access_token", + "files": {}, + "headers": { + "x-forwarded-for": "1.2.3.4" + }, + "method": "POST", + "params": {}, + "statusCode": 200, + "type": "REST", + "userId": "c82cbdff-e5be-4009-ac78-cdeea09ab4b1", + "version": "1" + } + } + }, + { + "request": { + "body": { + "body": { + "XML": {}, + "JSON_ARRAY": {}, + "FORM": { + "event": "CUSTOM_APP_EVENTS", + "advertiser_id": "df16bffa-5c3d-4fbb-9bce-3bab098129a7R", + "ud[em]": "48ddb93f0b30c475423fe177832912c5bcdce3cc72872f8051627967ef278e08", + "ud[fn]": "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08", + "ud[ge]": "62c66a7a5dd70c3146618063c344e531e6d4b59e379808443ce962b3abd63c5a", + "ud[ln]": "3547cb112ac4489af2310c0626cdba6f3097a2ad5a3b42ddd3b59c76c7a079a3", + "ud[ph]": "588211a01b10feacbf7988d97a06e86c18af5259a7f457fd8759b7f7409a7d1f", + "extinfo": "[\"a2\",\"\",\"\",\"\",\"8.1.0\",\"Redmi 6\",\"\",\"\",\"Banglalink\",640,480,\"1.23\",0,0,0,\"Europe/Berlin\"]", + "app_user_id": "c82cbdff-e5be-4009-ac78-cdeea09ab4b1", + "custom_events": "[{\"_logTime\":1567333011693,\"_eventName\":\"spin_result\",\"_valueToSum\":400,\"fb_currency\":\"GBP\",\"additional_bet_index\":0,\"battle_id\":\"N/A\",\"bet_amount\":9,\"bet_level\":1,\"bet_multiplier\":1,\"coin_balance\":9466052,\"current_module_name\":\"CasinoGameModule\",\"days_in_game\":0,\"extra_param\":\"N/A\",\"fb_profile\":\"0\",\"featureGameType\":\"N/A\",\"game_fps\":30,\"game_id\":\"fireEagleBase\",\"game_name\":\"FireEagleSlots\",\"gem_balance\":0,\"graphicsQuality\":\"HD\",\"idfa\":\"2bf99787-33d2-4ae2-a76a-c49672f97252\",\"internetReachability\":\"ReachableViaLocalAreaNetwork\",\"isLowEndDevice\":\"False\",\"is_auto_spin\":\"False\",\"is_turbo\":\"False\",\"isf\":\"False\",\"ishighroller\":\"False\",\"jackpot_win_amount\":90,\"jackpot_win_type\":\"Silver\",\"level\":6,\"lifetime_gem_balance\":0,\"no_of_spin\":1,\"player_total_battles\":0,\"player_total_shields\":0,\"start_date\":\"2019-08-01\",\"total_payments\":0,\"tournament_id\":\"T1561970819\",\"userId\":\"c82cbdff-e5be-4009-ac78-cdeea09ab4b1\",\"versionSessionCount\":2,\"win_amount\":0,\"fb_content_id\":[\"123\",\"345\",\"567\"]}]", + "advertiser_tracking_enabled": "0", + "application_tracking_enabled": "0" + }, + "JSON": {} + }, + "endpoint": "https://graph.facebook.com/v16.0/RudderFbApp/activities?access_token=my_access_token", + "files": {}, + "headers": { + "x-forwarded-for": "1.2.3.4" + }, + "method": "POST", + "params": {}, + "statusCode": 200, + "type": "REST", + "userId": "c82cbdff-e5be-4009-ac78-cdeea09ab4b1", + "version": "1" + } + } + }, + { + "request": { + "body": { + "body": { + "XML": {}, + "JSON_ARRAY": {}, + "FORM": { + "event": "CUSTOM_APP_EVENTS", + "advertiser_id": "df16bffa-5c3d-4fbb-9bce-3bab098129a7R", + "ud[em]": "48ddb93f0b30c475423fe177832912c5bcdce3cc72872f8051627967ef278e08", + "ud[fn]": "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08", + "ud[ge]": "62c66a7a5dd70c3146618063c344e531e6d4b59e379808443ce962b3abd63c5a", + "ud[ln]": "3547cb112ac4489af2310c0626cdba6f3097a2ad5a3b42ddd3b59c76c7a079a3", + "ud[ph]": "588211a01b10feacbf7988d97a06e86c18af5259a7f457fd8759b7f7409a7d1f", + "extinfo": "[\"a2\",\"\",\"\",\"\",\"8.1.0\",\"Redmi 6\",\"\",\"\",\"Banglalink\",640,480,\"1.23\",0,0,0,\"Europe/Berlin\"]", + "app_user_id": "c82cbdff-e5be-4009-ac78-cdeea09ab4b1", + "custom_events": "[{\"_logTime\":1567333011693,\"_eventName\":\"spin_result\",\"_valueToSum\":400,\"fb_currency\":\"GBP\",\"additional_bet_index\":0,\"battle_id\":\"N/A\",\"bet_amount\":9,\"bet_level\":1,\"bet_multiplier\":1,\"coin_balance\":9466052,\"current_module_name\":\"CasinoGameModule\",\"days_in_game\":0,\"extra_param\":\"N/A\",\"fb_profile\":\"0\",\"featureGameType\":\"N/A\",\"game_fps\":30,\"game_id\":\"fireEagleBase\",\"game_name\":\"FireEagleSlots\",\"gem_balance\":0,\"graphicsQuality\":\"HD\",\"idfa\":\"2bf99787-33d2-4ae2-a76a-c49672f97252\",\"internetReachability\":\"ReachableViaLocalAreaNetwork\",\"isLowEndDevice\":\"False\",\"is_auto_spin\":\"False\",\"is_turbo\":\"False\",\"isf\":\"False\",\"ishighroller\":\"False\",\"jackpot_win_amount\":90,\"jackpot_win_type\":\"Silver\",\"level\":6,\"lifetime_gem_balance\":0,\"no_of_spin\":1,\"player_total_battles\":0,\"player_total_shields\":0,\"start_date\":\"2019-08-01\",\"total_payments\":0,\"tournament_id\":\"T1561970819\",\"userId\":\"c82cbdff-e5be-4009-ac78-cdeea09ab4b1\",\"versionSessionCount\":2,\"win_amount\":0,\"fb_content_id\":[\"123\",\"345\",\"567\"]}]", + "advertiser_tracking_enabled": "0", + "application_tracking_enabled": "0" + }, + "JSON": {} + }, + "endpoint": "https://graph.facebook.com/v16.0/1234567891234567/events?access_token=invalid_timestamp_correct_access_token", + "files": {}, + "headers": { + "x-forwarded-for": "1.2.3.4" + }, + "method": "POST", + "params": {}, + "statusCode": 200, + "type": "REST", + "userId": "c82cbdff-e5be-4009-ac78-cdeea09ab4b1", + "version": "1" + } + } + }, + { + "request": { + "body": { + "version": "1", + "type": "REST", + "method": "POST", + "endpoint": "https://graph.facebook.com/v16.0/1234567891234567/events?access_token=throttled_valid_access_token", + "headers": {}, + "params": {}, + "body": { + "JSON": {}, + "XML": {}, + "JSON_ARRAY": {}, + "FORM": { + "extinfo": "[\"a2\",\"\",\"\",\"\",\"8.1.0\",\"Redmi 6\",\"\",\"\",\"Banglalink\",0,100,\"50.00\",0,0,0,\"\"]", + "custom_events": "[{\"_logTime\":1567333011693,\"_eventName\":\"Viewed Screen\",\"fb_description\":\"Main.1233\"}]", + "ud[em]": "48ddb93f0b30c475423fe177832912c5bcdce3cc72872f8051627967ef278e08", + "advertiser_tracking_enabled": "0", + "application_tracking_enabled": "0", + "event": "CUSTOM_APP_EVENTS" + } + }, + "files": {}, + "userId": "c82cbdff-e5be-4009-ac78-cdeea09ab4b1", + "statusCode": 200 + } + } + }, + { + "request": { + "body": { + "version": "1", + "type": "REST", + "method": "POST", + "endpoint": "https://graph.facebook.com/v16.0/1234567891234567/events?access_token=invalid_account_id_valid_access_token", + "headers": {}, + "params": {}, + "body": { + "JSON": {}, + "XML": {}, + "JSON_ARRAY": {}, + "FORM": { + "extinfo": "[\"a2\",\"\",\"\",\"\",\"8.1.0\",\"Redmi 6\",\"\",\"\",\"Banglalink\",0,100,\"50.00\",0,0,0,\"\"]", + "custom_events": "[{\"_logTime\":1567333011693,\"_eventName\":\"Viewed Screen\",\"fb_description\":\"Main.1233\"}]", + "ud[em]": "48ddb93f0b30c475423fe177832912c5bcdce3cc72872f8051627967ef278e08", + "advertiser_tracking_enabled": "0", + "application_tracking_enabled": "0", + "event": "CUSTOM_APP_EVENTS" + } + }, + "files": {}, + "userId": "c82cbdff-e5be-4009-ac78-cdeea09ab4b1", + "statusCode": 200 + } + } + } +] diff --git a/test/__tests__/data/fb_proxy_output.json b/test/__tests__/data/fb_proxy_output.json new file mode 100644 index 0000000000..9c25f36204 --- /dev/null +++ b/test/__tests__/data/fb_proxy_output.json @@ -0,0 +1,109 @@ +[ + { + "output": { + "status": 400, + "message": "Failed with The access token could not be decrypted during response transformation", + "destinationResponse": { + "error": { + "message": "The access token could not be decrypted", + "type": "OAuthException", + "code": 190, + "fbtrace_id": "fbpixel_trace_id" + }, + "status": 500 + }, + "statTags": { + "destType": "FB", + "errorCategory": "network", + "errorType": "aborted", + "feature": "dataDelivery", + "implementation": "native", + "module": "destination" + } + } + }, + { + "output": { + "status": 200, + "message": "Request Processed Successfully", + "destinationResponse": { + "events_received": 1, + "fbtrace_id": "facebook_trace_id" + } + } + }, + { + "output": { + "status": 400, + "message": "Failed with Invalid parameter during response transformation", + "destinationResponse": { + "error": { + "message": "Invalid parameter", + "type": "OAuthException", + "code": 100, + "error_subcode": 2804003, + "is_transient": false, + "error_user_title": "Event Timestamp Too Old", + "error_user_msg": "The timestamp for this event is too far in the past. Events need to be sent from your server within 7 days of when they occurred. Enter a timestamp that has occurred within the last 7 days.", + "fbtrace_id": "A6UyEgg_HdoiRX9duxcBOjb" + }, + "status": 400 + }, + "statTags": { + "destType": "FB", + "errorCategory": "network", + "errorType": "aborted", + "feature": "dataDelivery", + "implementation": "native", + "module": "destination" + } + } + }, + { + "output": { + "status": 429, + "message": "Failed with User request limit reached during response transformation", + "destinationResponse": { + "error": { + "message": "User request limit reached", + "type": "OAuthException", + "code": 17, + "fbtrace_id": "facebook_px_trace_id_4" + }, + "status": 500 + }, + "statTags": { + "destType": "FB", + "errorCategory": "network", + "errorType": "throttled", + "feature": "dataDelivery", + "implementation": "native", + "module": "destination" + } + } + }, + { + "output": { + "status": 400, + "message": "Failed with Unsupported post request. Object with ID '1234567891234569' does not exist, cannot be loaded due to missing permissions, or does not support this operation. Please read the Graph API documentation at https://developers.facebook.com/docs/graph-api during response transformation", + "destinationResponse": { + "error": { + "message": "Unsupported post request. Object with ID '1234567891234569' does not exist, cannot be loaded due to missing permissions, or does not support this operation. Please read the Graph API documentation at https://developers.facebook.com/docs/graph-api", + "type": "GraphMethodException", + "code": 100, + "error_subcode": 33, + "fbtrace_id": "facebook_px_trace_id_5" + }, + "status": 400 + }, + "statTags": { + "destType": "FB", + "errorCategory": "network", + "errorType": "aborted", + "feature": "dataDelivery", + "implementation": "native", + "module": "destination" + } + } + } +] diff --git a/test/__tests__/fb_custom_audience.test.js b/test/__tests__/fb_custom_audience.test.js index a3c68b46c7..6a10641cdf 100644 --- a/test/__tests__/fb_custom_audience.test.js +++ b/test/__tests__/fb_custom_audience.test.js @@ -41,3 +41,27 @@ describe(`${name} Tests`, () => { }); }); }); + +describe("Router Tests for rETL sources", () => { + it("should send events to dest", async () => { + const input = JSON.parse( + fs.readFileSync( + path.resolve( + __dirname, + `data/${integration}_router_rETL_input.json` + ) + ) + ); + const output = JSON.parse( + fs.readFileSync( + path.resolve( + __dirname, + `data/${integration}_router_rETL_output.json` + ) + ) + ); + const actualOutput = await transformer.processRouterDest(input); + console.log(JSON.stringify(actualOutput)) + expect(actualOutput).toEqual(output); + }); +}); diff --git a/test/__tests__/proxy.test.js b/test/__tests__/proxy.test.js index abdfbeb20f..e83201de98 100644 --- a/test/__tests__/proxy.test.js +++ b/test/__tests__/proxy.test.js @@ -10,6 +10,7 @@ const destinations = [ "google_adwords_remarketing_lists", "google_adwords_enhanced_conversions", "facebook_pixel", + "fb", "snapchat_custom_audience", "clevertap", "salesforce",