From 5226b67e29cf20343c66e8da9dbd338851e21697 Mon Sep 17 00:00:00 2001 From: Antoine Arlaud Date: Fri, 8 Nov 2024 17:21:08 +0100 Subject: [PATCH 1/4] chore: basic refactoring --- lib/client/config/configHelpers.ts | 7 +- lib/client/hooks/startup/processHooks.ts | 5 + lib/client/index.ts | 6 +- lib/client/utils/filterLoading.ts | 4 +- lib/common/broker-workload/index.ts | 142 +++++++++++ lib/common/config/config.ts | 7 - lib/common/filter/filter-rules-loading.ts | 3 +- lib/common/relay/forwardWebsocketRequest.ts | 239 +----------------- lib/common/relay/requestsHelper.ts | 16 +- lib/common/relay/responseSenders.ts | 159 ++++++++++++ lib/common/types/options.ts | 15 +- lib/common/utils/correlation-headers.ts | 23 ++ lib/index.ts | 59 ++--- lib/server/index.ts | 16 +- test/functional/cli.test.ts | 4 +- test/functional/client-pool.test.ts | 4 +- test/unit/client/validateConfig.test.ts | 12 + test/unit/connectionsManager/manager.test.ts | 6 +- test/unit/relay-response-body-client.test.ts | 13 +- ...lay-response-body-form-url-encoded.test.ts | 21 +- ...se-body-universal-form-url-encoded.test.ts | 16 +- .../relay-response-body-universal.test.ts | 9 +- test/unit/relay-response-body.test.ts | 16 +- ...-response-headers-form-url-headers.test.ts | 21 +- ...headers-universal-form-url-headers.test.ts | 11 +- .../relay-response-headers-universal.test.ts | 11 +- test/unit/relay-response-headers.test.ts | 11 +- test/unit/remoteConfig.test.ts | 13 +- test/unit/runtime-rules-hotloading.test.ts | 2 +- 29 files changed, 526 insertions(+), 345 deletions(-) create mode 100644 lib/common/broker-workload/index.ts create mode 100644 lib/common/relay/responseSenders.ts create mode 100644 lib/common/utils/correlation-headers.ts diff --git a/lib/client/config/configHelpers.ts b/lib/client/config/configHelpers.ts index 2665fcae5..b36103cd6 100644 --- a/lib/client/config/configHelpers.ts +++ b/lib/client/config/configHelpers.ts @@ -1,12 +1,9 @@ import { ConfigMetadata } from '../types/client'; import { Config } from '../types/config'; import { log as logger } from '../../logs/logger'; -import { - CONFIGURATION, - getConfig, - loadBrokerConfig, -} from '../../common/config/config'; +import { getConfig, loadBrokerConfig } from '../../common/config/config'; import version from '../../common/utils/version'; +import { CONFIGURATION } from '../../common/types/options'; export const reloadConfig = async (clientOpts) => { // Reload config with connection diff --git a/lib/client/hooks/startup/processHooks.ts b/lib/client/hooks/startup/processHooks.ts index 65c038848..42b4f337d 100644 --- a/lib/client/hooks/startup/processHooks.ts +++ b/lib/client/hooks/startup/processHooks.ts @@ -98,6 +98,11 @@ export const processStartUpHooks = async ( } if (commitSigningEnabled(clientOpts.config)) { + if (!clientOpts.filters) { + throw new Error( + 'Unexpected error: No filters found for commit signing rules injection.', + ); + } const commitSigningRules = commitSigningFilterRules(); if (clientOpts['universalBrokerEnabled']) { clientOpts.filters['github'].private?.push(...commitSigningRules); diff --git a/lib/client/index.ts b/lib/client/index.ts index 7c5e1cc79..eae614ed7 100644 --- a/lib/client/index.ts +++ b/lib/client/index.ts @@ -69,6 +69,7 @@ export const main = async (clientOpts: ClientOpts) => { 'https://api.snyk.io'; await validateMinimalConfig(clientOpts); + if (clientOpts.config.universalBrokerEnabled) { const pluginsFolderPath = await findPluginFolder( __dirname ?? process.cwd(), @@ -95,11 +96,10 @@ export const main = async (clientOpts: ClientOpts) => { await retrieveAndLoadFilters(clientOpts); }, ONEDAY); } - const globalIdentifyingMetadata: IdentifyingMetadata = { capabilities: ['post-streams'], clientId: clientOpts.config.brokerClientId, - filters: clientOpts.filters, + filters: clientOpts.filters ?? new Map(), preflightChecks: hookResults.preflightCheckResults, version, clientConfig: getClientConfigMetadata(clientOpts.config), @@ -212,6 +212,6 @@ export const main = async (clientOpts: ClientOpts) => { }; } catch (err) { logger.warn({ err }, `Shutting down client`); - // throw err; + throw err; } }; diff --git a/lib/client/utils/filterLoading.ts b/lib/client/utils/filterLoading.ts index 76ab66844..4dd5f59c3 100644 --- a/lib/client/utils/filterLoading.ts +++ b/lib/client/utils/filterLoading.ts @@ -1,8 +1,7 @@ import filterRulesLoader from '../../common/filter/filter-rules-loading'; -import { CONFIGURATION } from '../../common/config/config'; import { loadAllFilters } from '../../common/filter/filtersAsync'; import { log as logger } from '../../logs/logger'; -import { ClientOpts } from '../../common/types/options'; +import { ClientOpts, CONFIGURATION } from '../../common/types/options'; import { getFilterConfig } from '../config/filters'; export const retrieveAndLoadFilters = async ( @@ -10,6 +9,7 @@ export const retrieveAndLoadFilters = async ( ): Promise => { const globalFilterConfig = getFilterConfig(); const filters = await filterRulesLoader(clientOpts.config as CONFIGURATION); + clientOpts.filters = filters; globalFilterConfig.loadedFilters = loadAllFilters(filters, clientOpts.config); logger.debug('Loading Filters'); }; diff --git a/lib/common/broker-workload/index.ts b/lib/common/broker-workload/index.ts new file mode 100644 index 000000000..eabb58521 --- /dev/null +++ b/lib/common/broker-workload/index.ts @@ -0,0 +1,142 @@ +import { runPreRequestPlugins } from '../../client/brokerClientPlugins/pluginManager'; +import { getFilterConfig } from '../../client/config/filters'; +import { prepareRequestFromFilterResult } from '../relay/prepareRequest'; +import { LOADEDFILTERSET } from '../types/filter'; +import { ExtendedLogContext } from '../types/log'; +import { computeContentLength } from '../utils/content-length'; +import { contentLengthHeader } from '../utils/headers-value-constants'; +import { + incrementWebSocketRequestsTotal, + incrementHttpRequestsTotal, +} from '../utils/metrics'; +import { maskToken, hashToken } from '../utils/token'; +import { log as logger } from '../../logs/logger'; +import { HybridResponseHandler } from '../relay/responseSenders'; +import { getCorrelationDataFromHeaders } from '../utils/correlation-headers'; + +export class brokerWorkload { + options; + connectionIdentifier: string; + websocketConnectionHandler; + constructor( + connectionIdentifier: string, + options, + websocketConnectionHandler, + ) { + this.options = options; + this.connectionIdentifier = connectionIdentifier; + this.websocketConnectionHandler = websocketConnectionHandler; + } + + async handler(payload, websocketResponseHandler) { + if (this.options.config.universalBrokerEnabled) { + payload.connectionIdentifier = this.connectionIdentifier; + } + const correlationHeaders = getCorrelationDataFromHeaders(payload.headers); + + const logContext: ExtendedLogContext = { + url: payload.url, + connectionName: this.websocketConnectionHandler.friendlyName ?? '', + requestMethod: payload.method, + requestHeaders: payload.headers, + streamingID: payload.streamingID, + maskedToken: maskToken(this.connectionIdentifier), + hashedToken: hashToken(this.connectionIdentifier), + transport: + this.websocketConnectionHandler?.socket?.transport?.name ?? 'unknown', + responseMedium: payload.headers['x-broker-ws-response'] + ? 'websocket' + : 'http', + ...correlationHeaders, + }; + + if (!correlationHeaders.requestId) { + // This should be a warning but older clients won't send one + // TODO make this a warning when significant majority of clients are on latest version + logger.debug( + logContext, + 'Header Snyk-Request-Id not included in headers passed through', + ); + } + const responseHandler = new HybridResponseHandler( + { + connectionIdentifier: this.connectionIdentifier, + payloadStreamingId: payload.streamingID, + ...correlationHeaders, + overHttp: payload.headers['x-broker-ws-response'] ? false : true, + }, + this.websocketConnectionHandler, + websocketResponseHandler, + logContext, + ); + + const simplifiedContext = structuredClone(logContext); + delete simplifiedContext.requestHeaders; + logger.info( + simplifiedContext, + `[Websocket Flow] Received request from ${ + process.env.BROKER_TYPE == 'client' ? 'client' : 'server' + }`, + ); + + let filterResponse; + if ( + this.options.config.brokerType == 'client' && + this.options.config.universalBrokerEnabled + ) { + const loadedFilters = getFilterConfig().loadedFilters as Map< + string, + LOADEDFILTERSET + >; + filterResponse = + loadedFilters + .get(this.websocketConnectionHandler.supportedIntegrationType) + ?.private(payload) || false; + } else if (this.options.config.brokerType == 'client') { + const loadedFilters = getFilterConfig().loadedFilters as LOADEDFILTERSET; + filterResponse = loadedFilters.private(payload); + } else { + const loadedFilters = this.options.loadedFilters as LOADEDFILTERSET; + filterResponse = loadedFilters.private(payload); + } + if (!filterResponse) { + incrementWebSocketRequestsTotal(true, 'inbound-request'); + const reason = + '[Websocket Flow][Blocked Request] Does not match any accept rule'; + logContext.error = 'Blocked by filter rules'; + logger.warn(logContext, reason); + return responseHandler.sendResponse({ + status: 401, + body: { + message: 'blocked', + reason, + url: payload.url, + }, + }); + } else { + incrementWebSocketRequestsTotal(false, 'inbound-request'); + const preparedRequest = await prepareRequestFromFilterResult( + filterResponse, + payload, + logContext, + this.options, + this.connectionIdentifier, + this.websocketConnectionHandler?.socketType, + ); + if (this.options.config.universalBrokerEnabled) { + preparedRequest.req = await runPreRequestPlugins( + this.options, + this.connectionIdentifier, + preparedRequest.req, + ); + payload.headers[contentLengthHeader] = computeContentLength(payload); + } + incrementHttpRequestsTotal(false, 'outbound-request'); + await responseHandler.sendDataResponse( + payload.streamingID, + preparedRequest.req, + logContext, + ); + } + } +} diff --git a/lib/common/config/config.ts b/lib/common/config/config.ts index a74586582..84f5f3f41 100644 --- a/lib/common/config/config.ts +++ b/lib/common/config/config.ts @@ -7,16 +7,9 @@ import { log as logger } from '../../logs/logger'; let config: Record = {}; -export interface CONFIG { - supportedBrokerTypes: string[]; - brokerType: 'client' | 'server'; - filterRulesPaths: { [key: string]: string }; -} - export const getConfig = () => { return config; }; -export type CONFIGURATION = CONFIG & Record; export const findProjectRoot = (startDir: string): string | null => { let currentDir = startDir; diff --git a/lib/common/filter/filter-rules-loading.ts b/lib/common/filter/filter-rules-loading.ts index 848fb61fe..be1ab7661 100644 --- a/lib/common/filter/filter-rules-loading.ts +++ b/lib/common/filter/filter-rules-loading.ts @@ -3,10 +3,11 @@ import yaml from 'js-yaml'; import fs from 'fs'; import { log as logger } from '../../logs/logger'; -import { CONFIGURATION, findProjectRoot } from '../config/config'; +import { findProjectRoot } from '../config/config'; import camelcase from 'camelcase'; import { FiltersType, Rule } from '../types/filter'; import { retrieveFilters, isValidURI } from './utils'; +import { CONFIGURATION } from '../types/options'; const SUPPORTED_IAC_EXTENSIONS = ['tf', 'yaml', 'yml', 'tpl', 'json']; const IAC_SCM_ORIGINS = [ diff --git a/lib/common/relay/forwardWebsocketRequest.ts b/lib/common/relay/forwardWebsocketRequest.ts index da87e113b..9820ba9ab 100644 --- a/lib/common/relay/forwardWebsocketRequest.ts +++ b/lib/common/relay/forwardWebsocketRequest.ts @@ -1,25 +1,7 @@ import { RequestPayload } from '../types/http'; -import { ExtendedLogContext } from '../types/log'; -import { hashToken, maskToken } from '../utils/token'; -import { log as logger } from '../../logs/logger'; -import { BrokerServerPostResponseHandler } from '../http/downstream-post-stream-to-server'; -import { - incrementHttpRequestsTotal, - incrementWebSocketRequestsTotal, -} from '../utils/metrics'; import { WebSocketConnection } from '../../client/types/client'; -import { prepareRequestFromFilterResult } from './prepareRequest'; -import { - makeLegacyRequest, - makePostStreamingRequest, - legacyStreaming, -} from './requestsHelper'; -import { LOADEDFILTERSET } from '../types/filter'; import { LoadedClientOpts, LoadedServerOpts } from '../types/options'; -import { runPreRequestPlugins } from '../../client/brokerClientPlugins/pluginManager'; -import { computeContentLength } from '../utils/content-length'; -import { contentLengthHeader } from '../utils/headers-value-constants'; -import { getFilterConfig } from '../../client/config/filters'; +import { brokerWorkload } from '../broker-workload'; export const forwardWebSocketRequest = ( options: LoadedClientOpts | LoadedServerOpts, @@ -32,220 +14,11 @@ export const forwardWebSocketRequest = ( // 5. Send response over websocket conn return (connectionIdentifier) => async (payload: RequestPayload, emit) => { - if (options.config.universalBrokerEnabled) { - payload.connectionIdentifier = connectionIdentifier; - } - const requestId = payload.headers['snyk-request-id']; - const actingOrgPublicId = payload.headers['snyk-acting-org-public-id']; - const actingGroupPublicId = payload.headers['snyk-acting-group-public-id']; - const productLine = payload.headers['snyk-product-line']; - const flow = payload.headers['snyk-flow-name']; - - const logContext: ExtendedLogContext = { - url: payload.url, - connectionName: websocketConnectionHandler.friendlyName ?? '', - requestMethod: payload.method, - requestHeaders: payload.headers, - requestId, - streamingID: payload.streamingID, - maskedToken: maskToken(connectionIdentifier), - hashedToken: hashToken(connectionIdentifier), - transport: - websocketConnectionHandler?.socket?.transport?.name ?? 'unknown', - responseMedium: payload.headers['x-broker-ws-response'] - ? 'websocket' - : 'http', - actingGroupPublicId, - actingOrgPublicId, - flow, - productLine, - }; - - if (!requestId) { - // This should be a warning but older clients won't send one - // TODO make this a warning when significant majority of clients are on latest version - logger.debug( - logContext, - 'Header Snyk-Request-Id not included in headers passed through', - ); - } - - // It clarifies which emit handler is for websockets vs POST response - const websocketEmit = emit; - - const postOverrideEmit = ( - responseData, - isResponseFromRequestModule = false, - ) => { - try { - logContext.requestMethod = ''; - logContext.requestHeaders = {}; - - const postHandler = new BrokerServerPostResponseHandler( - logContext, - options.config, - connectionIdentifier, - options.config.universalBrokerEnabled - ? websocketConnectionHandler?.serverId - : options.config.serverId, - requestId, - websocketConnectionHandler.role, - ); - if (isResponseFromRequestModule) { - logger.debug( - logContext, - '[Websocket Flow] Posting HTTP streaming response back to Broker Server', - ); - postHandler.forwardRequest(responseData, payload.streamingID); - } else { - logger.debug( - logContext, - '[Websocket Flow] Posting HTTP response back to Broker Server', - ); - // Only for responses generated internally in the Broker Client/Server - postHandler.sendData(responseData, payload.streamingID); - } - } catch (err) { - logger.error({ err }, `Error Posting via Emit callback.`); - } - }; - - const legacyOverrideEmit = ( - responseData, - isResponseFromRequestModule = false, - ) => { - if (responseData) { - responseData['headers'] = responseData['headers'] ?? {}; - responseData.headers['snyk-request-id'] = requestId; - responseData.headers['x-broker-ws-response'] = - responseData.headers['x-broker-ws-response'] ?? 'true'; - } - - // Traffic over websockets - if (payload.streamingID) { - if (isResponseFromRequestModule) { - legacyStreaming( - logContext, - responseData, - options.config, - websocketConnectionHandler, - payload.streamingID, - ); - } else { - websocketConnectionHandler?.send( - 'chunk', - payload.streamingID, - '', - false, - { - status: responseData.status, - headers: responseData.headers, - }, - ); - websocketConnectionHandler?.send( - 'chunk', - payload.streamingID, - JSON.stringify(responseData.body), - true, - ); - } - } else { - logger.debug( - { ...logContext, responseData }, - '[Websocket Flow] (Legacy) Sending fixed response over WebSocket connection', - ); - websocketEmit(responseData); - } - }; - - if ( - websocketConnectionHandler?.capabilities?.includes( - 'receive-post-streams', - ) && - !emit - ) { - // Traffic over HTTP Post - emit = postOverrideEmit; - } else { - emit = legacyOverrideEmit; - } - - const simplifiedContext = structuredClone(logContext); - delete simplifiedContext.requestHeaders; - logger.info( - simplifiedContext, - `[Websocket Flow] Received request from ${ - process.env.BROKER_TYPE == 'client' ? 'client' : 'server' - }`, + const workload = new brokerWorkload( + connectionIdentifier, + options, + websocketConnectionHandler, ); - - let filterResponse; - if ( - options.config.brokerType == 'client' && - options.config.universalBrokerEnabled - ) { - const loadedFilters = getFilterConfig().loadedFilters as Map< - string, - LOADEDFILTERSET - >; - filterResponse = - loadedFilters - .get(websocketConnectionHandler.supportedIntegrationType) - ?.private(payload) || false; - } else if (options.config.brokerType == 'client') { - const loadedFilters = getFilterConfig().loadedFilters as LOADEDFILTERSET; - filterResponse = loadedFilters.private(payload); - } else { - const loadedFilters = options.loadedFilters as LOADEDFILTERSET; - filterResponse = loadedFilters.private(payload); - } - if (!filterResponse) { - incrementWebSocketRequestsTotal(true, 'inbound-request'); - const reason = - '[Websocket Flow][Blocked Request] Does not match any accept rule'; - logContext.error = 'Blocked by filter rules'; - logger.warn(logContext, reason); - return emit({ - status: 401, - body: { - message: 'blocked', - reason, - url: payload.url, - }, - }); - } else { - incrementWebSocketRequestsTotal(false, 'inbound-request'); - const preparedRequest = await prepareRequestFromFilterResult( - filterResponse, - payload, - logContext, - options, - connectionIdentifier, - websocketConnectionHandler?.socketType, - ); - if (options.config.universalBrokerEnabled) { - preparedRequest.req = await runPreRequestPlugins( - options, - connectionIdentifier, - preparedRequest.req, - ); - payload.headers[contentLengthHeader] = computeContentLength(payload); - } - if (options.config.brokerType !== 'client') { - preparedRequest.req.headers['x-snyk-broker'] = `${maskToken( - connectionIdentifier, - )}`; - } - - incrementHttpRequestsTotal(false, 'outbound-request'); - payload.streamingID - ? await makePostStreamingRequest(preparedRequest.req, emit, logContext) - : await makeLegacyRequest( - preparedRequest.req, - emit, - logContext, - options, - ); - } + await workload.handler(payload, emit); }; }; diff --git a/lib/common/relay/requestsHelper.ts b/lib/common/relay/requestsHelper.ts index 9d270066c..142357ff8 100644 --- a/lib/common/relay/requestsHelper.ts +++ b/lib/common/relay/requestsHelper.ts @@ -22,7 +22,7 @@ export const makePostStreamingRequest = async ( try { const downstreamRequestIncomingResponse = await makeStreamingRequestToDownstream(req); - emitCallback(downstreamRequestIncomingResponse, true); + emitCallback(downstreamRequestIncomingResponse); } catch (e) { logger.error( { @@ -49,8 +49,8 @@ export const makeLegacyRequest = async ( // Note that the other side of the request will also check the length and will also reject it if it's too large // Set to 20MB even though the server is 21MB because the server looks at the total data travelling through the websocket, // not just the size of the body, so allow 1MB for miscellaneous data (e.g., headers, Primus overhead) - const maxLength = - parseInt(options.config.socketMaxResponseLength) || 20971520; + + const maxLength = parseInt(options.socketMaxResponseLength) || 20971520; if (contentLength && contentLength > maxLength) { const errorMessage = `body size of ${contentLength} is greater than max allowed of ${maxLength} bytes`; logError(logContext, { @@ -67,12 +67,8 @@ export const makeLegacyRequest = async ( } const status = (response && response.statusCode) || 500; - if (options.config.RES_BODY_URL_SUB && isJson(response.headers)) { - const replaced = replaceUrlPartialChunk( - response.body, - null, - options.config, - ); + if (options.RES_BODY_URL_SUB && isJson(response.headers)) { + const replaced = replaceUrlPartialChunk(response.body, null, options); response.body = replaced.newChunk; } if (status > 404) { @@ -85,7 +81,7 @@ export const makeLegacyRequest = async ( `[Websocket Flow][Inbound] Unexpected status code for relayed request`, ); } - logResponse(logContext, status, response, options.config); + logResponse(logContext, status, response, options); emitCallback({ status, body: response.body, headers: response.headers }); } catch (error) { logError(logContext, error); diff --git a/lib/common/relay/responseSenders.ts b/lib/common/relay/responseSenders.ts new file mode 100644 index 000000000..124e0ffa0 --- /dev/null +++ b/lib/common/relay/responseSenders.ts @@ -0,0 +1,159 @@ +import { getConfig } from '../config/config'; +import { BrokerServerPostResponseHandler } from '../http/downstream-post-stream-to-server'; +import { + legacyStreaming, + makeLegacyRequest, + makePostStreamingRequest, +} from './requestsHelper'; +import { log as logger } from '../../logs/logger'; +import { CorrelationHeaders } from '../utils/correlation-headers'; + +export type RequestMetadata = { + connectionIdentifier: string; + payloadStreamingId: string; + overHttp: boolean; +} & CorrelationHeaders; + +export class HybridResponseHandler { + connectionIdentifier; + websocketConnectionHandler; + logContext; + options; + requestMetadata: RequestMetadata; + websocketResponseHandler; + responseHandler; + constructor( + requestMetadata: RequestMetadata, + websocketConnectionHandler, + websocketResponseHandler, + logContext, + ) { + this.logContext = logContext; + this.websocketResponseHandler = websocketResponseHandler; + this.options = getConfig(); + this.connectionIdentifier = requestMetadata.connectionIdentifier; + this.websocketConnectionHandler = websocketConnectionHandler; + this.requestMetadata = requestMetadata; + if ( + this.websocketConnectionHandler?.capabilities?.includes( + 'receive-post-streams', + ) && + !this.websocketResponseHandler + ) { + // Traffic over HTTP Post + this.responseHandler = this.postOverrideEmit; + } else { + this.responseHandler = this.legacyOverrideEmit; + } + } + + sendResponse = (payload) => { + this.responseHandler(payload); + }; + sendDataResponseInternal = (payload) => { + this.responseHandler(payload, this.requestMetadata.overHttp); + }; + + sendDataResponse = async ( + payloadStreamingId, + preparedRequest, + logContext, + ) => { + payloadStreamingId + ? await makePostStreamingRequest( + preparedRequest, + this.sendDataResponseInternal, + logContext, + ) + : await makeLegacyRequest( + preparedRequest, + this.sendResponse, + logContext, + this.options, + ); + }; + + private postOverrideEmit = (responseData, replyOverHttp = false) => { + try { + this.logContext.requestMethod = ''; + this.logContext.requestHeaders = {}; + const postHandler = new BrokerServerPostResponseHandler( + this.logContext, + this.options, + this.connectionIdentifier, + this.options.universalBrokerEnabled + ? this.websocketConnectionHandler?.serverId + : this.options.serverId, + this.requestMetadata.requestId, + this.websocketConnectionHandler.role, + ); + if (replyOverHttp) { + logger.debug( + this.logContext, + '[Websocket Flow] Posting HTTP streaming response back to Broker Server', + ); + postHandler.forwardRequest( + responseData, + this.requestMetadata.payloadStreamingId, + ); + } else { + logger.debug( + this.logContext, + '[Websocket Flow] Posting HTTP response back to Broker Server', + ); + // Only for responses generated internally in the Broker Client/Server + postHandler.sendData( + responseData, + this.requestMetadata.payloadStreamingId, + ); + } + } catch (err) { + logger.error({ err }, `Error Posting via Emit callback.`); + } + }; + + private legacyOverrideEmit = (responseData, streamOverWebsocket = false) => { + if (responseData) { + responseData['headers'] = responseData['headers'] ?? {}; + responseData.headers['snyk-request-id'] = this.requestMetadata.requestId; + responseData.headers['x-broker-ws-response'] = + responseData.headers['x-broker-ws-response'] ?? 'true'; + } + + // Traffic over websockets + if (this.requestMetadata.payloadStreamingId) { + if (streamOverWebsocket) { + legacyStreaming( + this.logContext, + responseData, + this.options, + this.websocketConnectionHandler, + this.requestMetadata.payloadStreamingId, + ); + } else { + this.websocketConnectionHandler?.send( + 'chunk', + this.requestMetadata.payloadStreamingId, + '', + false, + { + status: responseData.status, + headers: responseData.headers, + }, + ); + this.websocketConnectionHandler?.send( + 'chunk', + this.requestMetadata.payloadStreamingId, + JSON.stringify(responseData.body), + true, + ); + } + } else { + logger.debug( + { ...this.logContext, responseData }, + '[Websocket Flow] (Legacy) Sending fixed response over WebSocket connection', + ); + this.websocketResponseHandler(responseData); + } + }; +} diff --git a/lib/common/types/options.ts b/lib/common/types/options.ts index 589df8151..3adb1b707 100644 --- a/lib/common/types/options.ts +++ b/lib/common/types/options.ts @@ -1,9 +1,17 @@ import { FiltersType, LOADEDFILTERSET } from './filter'; +export interface CONFIG { + supportedBrokerTypes: string[]; + brokerType: 'client' | 'server'; + filterRulesPaths: { [key: string]: string }; +} + +export type CONFIGURATION = CONFIG & Record; + export interface ClientOpts { port: number; - config: Record; - filters: FiltersType | Map; + config: CONFIGURATION; + filters?: FiltersType | Map; serverId?: string; connections?: Record; oauth?: { @@ -19,8 +27,7 @@ export interface ClientOpts { export interface ServerOpts { port: number; - config: Record; - filters: FiltersType; + config: CONFIGURATION; } export interface LoadedFiltersSet { loadedFilters?: LOADEDFILTERSET | Map; diff --git a/lib/common/utils/correlation-headers.ts b/lib/common/utils/correlation-headers.ts new file mode 100644 index 000000000..fecb1945d --- /dev/null +++ b/lib/common/utils/correlation-headers.ts @@ -0,0 +1,23 @@ +export interface CorrelationHeaders { + requestId: string; + actingOrgPublicId: string; + actingGroupPublicId: string; + productLine: string; + flow: string; +} +export const getCorrelationDataFromHeaders = ( + headers: Headers, +): CorrelationHeaders => { + const requestId = headers['snyk-request-id']; + const actingOrgPublicId = headers['snyk-acting-org-public-id']; + const actingGroupPublicId = headers['snyk-acting-group-public-id']; + const productLine = headers['snyk-product-line']; + const flow = headers['snyk-flow-name']; + return { + requestId, + actingOrgPublicId, + actingGroupPublicId, + productLine, + flow, + }; +}; diff --git a/lib/index.ts b/lib/index.ts index 71e833a1b..eb00e2d1b 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,16 +1,7 @@ import 'clarify'; // clean the stacktraces - -import filterRulesLoader, { - isUniversalFilters, -} from './common/filter/filter-rules-loading'; import { log as logger } from './logs/logger'; -import { - CONFIGURATION, - getConfig, - loadBrokerConfig, -} from './common/config/config'; - -import { FiltersType } from './common/types/filter'; +import { getConfig, loadBrokerConfig } from './common/config/config'; +import { CONFIGURATION } from './common/types/options'; process.on('uncaughtExceptionMonitor', (error, origin) => { logger.error({ error, origin }, 'found unhandled exception'); @@ -48,39 +39,21 @@ export const app = async ({ port = 7341, client = false, config }) => { any > as CONFIGURATION; localConfig.brokerType = method; - const filters = await filterRulesLoader(localConfig); - if (!filters) { - const error = new ReferenceError( - `No Filters found. A Broker requires filters to run. Shutting down.`, - ); - error['code'] = 'MISSING_FILTERS'; - logger.error({ error }, error.message); - throw error; + + if (method == 'client') { + return await ( + await import('./client') + ).main({ + config: localConfig, + port: localConfig.port || port, + }); } else { - if (method == 'client') { - return await ( - await import('./client') - ).main({ - config: localConfig, - port: localConfig.port || port, - filters, - }); - } else { - if (isUniversalFilters(filters)) { - throw new ReferenceError( - 'Unexpected Universal Broker filters for server', - ); - } else { - const classicFilters: FiltersType = filters as FiltersType; - return await ( - await import('./server') - ).main({ - config: localConfig, - port: localConfig.port || port, - filters: classicFilters, - }); - } - } + return await ( + await import('./server') + ).main({ + config: localConfig, + port: localConfig.port || port, + }); } } catch (error: any) { logger.error({ error }, `${error.message}`); diff --git a/lib/server/index.ts b/lib/server/index.ts index 684f07207..e6bb05b09 100644 --- a/lib/server/index.ts +++ b/lib/server/index.ts @@ -11,15 +11,28 @@ import { ServerOpts } from '../common/types/options'; import { overloadHttpRequestWithConnectionDetailsMiddleware } from './routesHandlers/httpRequestHandler'; import { getForwardHttpRequestHandler } from './socketHandlers/initHandlers'; import { loadAllFilters } from '../common/filter/filtersAsync'; +import { FiltersType } from '../common/types/filter'; +import filterRulesLoader from '../common/filter/filter-rules-loading'; export const main = async (serverOpts: ServerOpts) => { logger.info({ version }, 'Broker starting in server mode'); + const filters = await filterRulesLoader(serverOpts.config); + if (!filters) { + const error = new ReferenceError( + `No Filters found. A Broker requires filters to run. Shutting down.`, + ); + error['code'] = 'MISSING_FILTERS'; + logger.error({ error }, error.message); + throw error; + } + const classicFilters: FiltersType = filters as FiltersType; + // start the local webserver to listen for relay requests const { app, server } = webserver(serverOpts.config, serverOpts.port); const loadedServerOpts = { - loadedFilters: loadAllFilters(serverOpts.filters, serverOpts.config), + loadedFilters: loadAllFilters(classicFilters, serverOpts.config), ...serverOpts, }; if (!loadedServerOpts.loadedFilters) { @@ -44,7 +57,6 @@ export const main = async (serverOpts: ServerOpts) => { app.use(applyPrometheusMiddleware()); } app.get('/connection-status/:token', connectionStatusHandler); - app.all( '/broker/:token/*', overloadHttpRequestWithConnectionDetailsMiddleware, diff --git a/test/functional/cli.test.ts b/test/functional/cli.test.ts index cb399a202..f494bf816 100644 --- a/test/functional/cli.test.ts +++ b/test/functional/cli.test.ts @@ -20,7 +20,7 @@ describe('CLI', () => { } catch (err) { expect(err).toEqual( new ReferenceError( - 'No Filters found. A Broker requires filters to run. Shutting down.', + 'BROKER_TOKEN is required to successfully identify itself to the server', ), ); } @@ -41,7 +41,7 @@ describe('CLI', () => { } catch (err) { expect(err).toEqual( new ReferenceError( - 'No Filters found. A Broker requires filters to run. Shutting down.', + 'BROKER_SERVER_URL is required to connect to the broker server', ), ); } diff --git a/test/functional/client-pool.test.ts b/test/functional/client-pool.test.ts index 4625a0775..311734aed 100644 --- a/test/functional/client-pool.test.ts +++ b/test/functional/client-pool.test.ts @@ -62,8 +62,10 @@ describe('correctly handle pool of multiple clients with same BROKER_TOKEN', () }); }); - describe('2nd client', () => { + describe('2 clients', () => { beforeAll(async () => { + const PORT = 9999; + process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; bcFirst = await createBrokerClient({ brokerServerUrl: `http://localhost:${bs.port}`, brokerToken: '12345', diff --git a/test/unit/client/validateConfig.test.ts b/test/unit/client/validateConfig.test.ts index 0d2f9f609..7ea649e06 100644 --- a/test/unit/client/validateConfig.test.ts +++ b/test/unit/client/validateConfig.test.ts @@ -8,6 +8,9 @@ describe('Client Config validations', () => { config: { brokerToken: '123', brokerServerUrl: 'test', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'client', }, filters: { public: [], private: [] }, }; @@ -23,6 +26,9 @@ describe('Client Config validations', () => { brokerServerUrl: 'test', clientId: '123', clientSecret: '123', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'client', }, filters: { public: [], private: [] }, }; @@ -39,6 +45,9 @@ describe('Client Config validations', () => { brokerToken: '123', brokerServerUrl: 'test', universalBrokerEnabled: true, + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'client', }, filters: { public: [], private: [] }, }; @@ -56,6 +65,9 @@ describe('Client Config validations', () => { brokerServerUrl: 'test', universalBrokerEnabled: true, SKIP_REMOTE_CONFIG: true, + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'client', }, filters: { public: [], private: [] }, }; diff --git a/test/unit/connectionsManager/manager.test.ts b/test/unit/connectionsManager/manager.test.ts index 7421e084a..e30cd296f 100644 --- a/test/unit/connectionsManager/manager.test.ts +++ b/test/unit/connectionsManager/manager.test.ts @@ -8,7 +8,11 @@ describe('Connections Manager', () => { const config: LoadedClientOpts = { loadedFilters: new Map(), port: 0, - config: {}, + config: { + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'client', + }, filters: new Map(), }; const globalIdentifyingMetadata: IdentifyingMetadata = { diff --git a/test/unit/relay-response-body-client.test.ts b/test/unit/relay-response-body-client.test.ts index 3aa446886..fa2f8443b 100644 --- a/test/unit/relay-response-body-client.test.ts +++ b/test/unit/relay-response-body-client.test.ts @@ -17,6 +17,7 @@ const mockedFn = makeRequestToDownstream.mockImplementation((data) => { import { forwardWebSocketRequest as relay } from '../../lib/common/relay/forwardWebsocketRequest'; import { + CONFIGURATION, LoadedClientOpts, LoadedServerOpts, } from '../../lib/common/types/options'; @@ -73,10 +74,12 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { HOST: 'localhost', PORT: '8001', brokerType: 'client', + supportedBrokerTypes: [], + filterRulesPaths: {}, }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { @@ -137,12 +140,14 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { HOST: 'localhost', PORT: '8001', disableBodyVarsSubstitution: true, brokerServerUrl: 'http://localhost:8001', brokerType: 'client', + supportedBrokerTypes: [], + filterRulesPaths: {}, }; const options: LoadedClientOpts | LoadedServerOpts = { @@ -195,12 +200,14 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { HOST: 'localhost', PORT: '8001', disableBodyVarsSubstitution: true, brokerServerUrl: 'http://localhost:8001', brokerType: 'client', + supportedBrokerTypes: [], + filterRulesPaths: {}, }; const options: LoadedClientOpts | LoadedServerOpts = { diff --git a/test/unit/relay-response-body-form-url-encoded.test.ts b/test/unit/relay-response-body-form-url-encoded.test.ts index 8fbc5a0c1..9573e0a26 100644 --- a/test/unit/relay-response-body-form-url-encoded.test.ts +++ b/test/unit/relay-response-body-form-url-encoded.test.ts @@ -16,6 +16,7 @@ const mockedFn = makeRequestToDownstream.mockImplementation((data) => { import { forwardWebSocketRequest as relay } from '../../lib/common/relay/forwardWebsocketRequest'; import { + CONFIGURATION, LoadedClientOpts, LoadedServerOpts, } from '../../lib/common/types/options'; @@ -72,9 +73,12 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { HOST: 'localhost', PORT: '8001', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { @@ -127,9 +131,12 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { HOST: 'localhost', PORT: '8001', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { @@ -179,11 +186,14 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { HOST: 'localhost', PORT: '8001', disableBodyVarsSubstitution: true, brokerServerUrl: 'http://localhost:8001', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { @@ -231,10 +241,13 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { HOST: 'localhost', PORT: '8001', disableBodyVarsSubstitution: true, + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { diff --git a/test/unit/relay-response-body-universal-form-url-encoded.test.ts b/test/unit/relay-response-body-universal-form-url-encoded.test.ts index ce206d9d0..1317b1034 100644 --- a/test/unit/relay-response-body-universal-form-url-encoded.test.ts +++ b/test/unit/relay-response-body-universal-form-url-encoded.test.ts @@ -16,6 +16,7 @@ const mockedFn = makeRequestToDownstream.mockImplementation((data) => { import { forwardWebSocketRequest as relay } from '../../lib/common/relay/forwardWebsocketRequest'; import { + CONFIGURATION, LoadedClientOpts, LoadedServerOpts, } from '../../lib/common/types/options'; @@ -72,7 +73,7 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { universalBrokerEnabled: true, plugins: new Map(), connections: { @@ -82,6 +83,9 @@ describe('body relay', () => { PORT: '8001', }, }, + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { @@ -138,7 +142,7 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { universalBrokerEnabled: true, plugins: new Map(), connections: { @@ -150,6 +154,9 @@ describe('body relay', () => { }, disableBodyVarsSubstitution: true, brokerServerUrl: 'http://localhost:8001', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { @@ -197,7 +204,7 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { universalBrokerEnabled: true, plugins: new Map(), connections: { @@ -207,6 +214,9 @@ describe('body relay', () => { PORT: '8001', }, }, + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { diff --git a/test/unit/relay-response-body-universal.test.ts b/test/unit/relay-response-body-universal.test.ts index 2a1162599..aac051ca7 100644 --- a/test/unit/relay-response-body-universal.test.ts +++ b/test/unit/relay-response-body-universal.test.ts @@ -19,6 +19,7 @@ const mockedFn = makeLegacyRequest.mockImplementation((data, emit) => { import { forwardWebSocketRequest as relay } from '../../lib/common/relay/forwardWebsocketRequest'; import { + CONFIGURATION, LoadedClientOpts, LoadedServerOpts, } from '../../lib/common/types/options'; @@ -83,7 +84,7 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { universalBrokerEnabled: true, plugins: new Map(), brokerType: 'client', @@ -103,6 +104,8 @@ describe('body relay', () => { }, github: { default: {} }, }, + supportedBrokerTypes: [], + filterRulesPaths: {}, }; loadBrokerConfig(config); @@ -154,7 +157,7 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { disableBodyVarsSubstitution: true, universalBrokerEnabled: true, plugins: new Map(), @@ -175,6 +178,8 @@ describe('body relay', () => { }, github: { default: {} }, }, + supportedBrokerTypes: [], + filterRulesPaths: {}, }; loadBrokerConfig(config); diff --git a/test/unit/relay-response-body.test.ts b/test/unit/relay-response-body.test.ts index 43e9d7ef6..8a04f0834 100644 --- a/test/unit/relay-response-body.test.ts +++ b/test/unit/relay-response-body.test.ts @@ -16,6 +16,7 @@ const mockedFn = makeRequestToDownstream.mockImplementation((data) => { import { forwardWebSocketRequest as relay } from '../../lib/common/relay/forwardWebsocketRequest'; import { + CONFIGURATION, LoadedClientOpts, LoadedServerOpts, } from '../../lib/common/types/options'; @@ -72,9 +73,12 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { HOST: 'localhost', PORT: '8001', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { @@ -133,11 +137,14 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { HOST: 'localhost', PORT: '8001', disableBodyVarsSubstitution: true, brokerServerUrl: 'http://localhost:8001', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { @@ -187,11 +194,14 @@ describe('body relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { HOST: 'localhost', PORT: '8001', disableBodyVarsSubstitution: true, brokerServerUrl: 'http://localhost:8001', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { diff --git a/test/unit/relay-response-headers-form-url-headers.test.ts b/test/unit/relay-response-headers-form-url-headers.test.ts index 775f56705..ae6d9c3d7 100644 --- a/test/unit/relay-response-headers-form-url-headers.test.ts +++ b/test/unit/relay-response-headers-form-url-headers.test.ts @@ -15,6 +15,7 @@ const mockedFn = makeRequestToDownstream.mockImplementation((data) => { import { forwardWebSocketRequest as relay } from '../../lib/common/relay/forwardWebsocketRequest'; import { + CONFIGURATION, LoadedClientOpts, LoadedServerOpts, } from '../../lib/common/types/options'; @@ -69,9 +70,12 @@ describe('header relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { SECRET_TOKEN: 'very-secret', VALUE: 'some-special-value', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { @@ -124,9 +128,12 @@ describe('header relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { SECRET_TOKEN: 'very-secret', VALUE: 'some-special-value', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { @@ -175,11 +182,14 @@ describe('header relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { SECRET_TOKEN: 'very-secret', VALUE: 'some-special-value', disableHeaderVarsSubstitution: true, brokerServerUrl: 'http://localhost:8001', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { @@ -227,10 +237,13 @@ describe('header relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { SECRET_TOKEN: 'very-secret', VALUE: 'some-special-value', disableHeaderVarsSubstitution: true, + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { diff --git a/test/unit/relay-response-headers-universal-form-url-headers.test.ts b/test/unit/relay-response-headers-universal-form-url-headers.test.ts index 7310f0790..7faa8fa5b 100644 --- a/test/unit/relay-response-headers-universal-form-url-headers.test.ts +++ b/test/unit/relay-response-headers-universal-form-url-headers.test.ts @@ -15,6 +15,7 @@ const mockedFn = makeRequestToDownstream.mockImplementation((data) => { import { forwardWebSocketRequest as relay } from '../../lib/common/relay/forwardWebsocketRequest'; import { + CONFIGURATION, LoadedClientOpts, LoadedServerOpts, } from '../../lib/common/types/options'; @@ -69,7 +70,7 @@ describe('header relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { universalBrokerEnabled: true, plugins: new Map(), connections: { @@ -79,6 +80,9 @@ describe('header relay', () => { VALUE: 'some-special-value', }, }, + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { @@ -135,7 +139,7 @@ describe('header relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { universalBrokerEnabled: true, plugins: new Map(), connections: { @@ -147,6 +151,9 @@ describe('header relay', () => { }, disableHeaderVarsSubstitution: true, brokerServerUrl: 'http://localhost:8001', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { diff --git a/test/unit/relay-response-headers-universal.test.ts b/test/unit/relay-response-headers-universal.test.ts index ebdff40af..4ed3f2ada 100644 --- a/test/unit/relay-response-headers-universal.test.ts +++ b/test/unit/relay-response-headers-universal.test.ts @@ -15,6 +15,7 @@ const mockedFn = makeRequestToDownstream.mockImplementation((data) => { import { forwardWebSocketRequest as relay } from '../../lib/common/relay/forwardWebsocketRequest'; import { + CONFIGURATION, LoadedClientOpts, LoadedServerOpts, } from '../../lib/common/types/options'; @@ -69,7 +70,7 @@ describe('header relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { universalBrokerEnabled: true, plugins: new Map(), connections: { @@ -79,6 +80,9 @@ describe('header relay', () => { VALUE: 'some-special-value', }, }, + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { @@ -129,7 +133,7 @@ describe('header relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { universalBrokerEnabled: true, plugins: new Map(), connections: { @@ -141,6 +145,9 @@ describe('header relay', () => { }, disableHeaderVarsSubstitution: true, brokerServerUrl: 'http://localhost:8001', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { diff --git a/test/unit/relay-response-headers.test.ts b/test/unit/relay-response-headers.test.ts index 7c6a2296b..2d175b66b 100644 --- a/test/unit/relay-response-headers.test.ts +++ b/test/unit/relay-response-headers.test.ts @@ -15,6 +15,7 @@ const mockedFn = makeRequestToDownstream.mockImplementation((data) => { import { forwardWebSocketRequest as relay } from '../../lib/common/relay/forwardWebsocketRequest'; import { + CONFIGURATION, LoadedClientOpts, LoadedServerOpts, } from '../../lib/common/types/options'; @@ -69,9 +70,12 @@ describe('header relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { SECRET_TOKEN: 'very-secret', VALUE: 'some-special-value', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { filters: { @@ -120,11 +124,14 @@ describe('header relay', () => { const brokerToken = 'test-broker'; - const config = { + const config: CONFIGURATION = { SECRET_TOKEN: 'very-secret', VALUE: 'some-special-value', disableHeaderVarsSubstitution: true, brokerServerUrl: 'http://localhost:8001', + supportedBrokerTypes: [], + filterRulesPaths: {}, + brokerType: 'server', }; const options: LoadedClientOpts | LoadedServerOpts = { diff --git a/test/unit/remoteConfig.test.ts b/test/unit/remoteConfig.test.ts index 3774d1d92..74838020d 100644 --- a/test/unit/remoteConfig.test.ts +++ b/test/unit/remoteConfig.test.ts @@ -1,7 +1,7 @@ import { unlinkSync, writeFileSync } from 'fs'; import { retrieveConnectionsForDeployment } from '../../lib/client/config/remoteConfig'; import { getConfig, loadBrokerConfig } from '../../lib/common/config/config'; -import { ClientOpts } from '../../lib/common/types/options'; +import { ClientOpts, CONFIGURATION } from '../../lib/common/types/options'; const nock = require('nock'); const universalFilePathLocationForTests = `${__dirname}/../../config.universal.json`; @@ -77,7 +77,7 @@ describe('Remote config helpers', () => { }); it('Retrieve connections from deployment ID', async () => { await loadBrokerConfig(); - let config = getConfig(); + let config = getConfig() as CONFIGURATION; const deploymentId = '67890'; const apiVersion = '2024-04-02~experimental'; @@ -85,6 +85,9 @@ describe('Remote config helpers', () => { config.deploymentId = deploymentId; config.apiVersion = apiVersion; config.API_BASE_URL = apiBaseUrl; + config.supportedBrokerTypes = []; + config.filterRulesPaths = {}; + config.brokerType = 'client'; process.env.SERVICE_ENV = 'universal'; process.env.CLIENT_ID = '123'; process.env.CLIENT_SECRET = '123'; @@ -100,7 +103,7 @@ describe('Remote config helpers', () => { universalFilePathLocationForTests, ); await loadBrokerConfig(); - config = getConfig(); + config = getConfig() as CONFIGURATION; expect(config.connections).toEqual({ 'my github connection': { GITHUB_TOKEN: 'GITHUB_TOKEN_XYZ', @@ -114,7 +117,7 @@ describe('Remote config helpers', () => { it('Retrieve identifier less connection from install ID and deployment ID', async () => { await loadBrokerConfig(); - let config = getConfig(); + let config = getConfig() as CONFIGURATION; const deploymentId = '67891'; const apiVersion = '2024-04-02~experimental'; @@ -137,7 +140,7 @@ describe('Remote config helpers', () => { universalFilePathLocationForTests, ); await loadBrokerConfig(); - config = getConfig(); + config = getConfig() as CONFIGURATION; expect(config.connections).toEqual({ 'my github connection': { diff --git a/test/unit/runtime-rules-hotloading.test.ts b/test/unit/runtime-rules-hotloading.test.ts index 5f7585ae1..cac7af0a6 100644 --- a/test/unit/runtime-rules-hotloading.test.ts +++ b/test/unit/runtime-rules-hotloading.test.ts @@ -1,10 +1,10 @@ import path from 'path'; import loadFilterRules from '../../lib/common/filter/filter-rules-loading'; -import { CONFIGURATION } from '../../lib/common/config/config'; import camelcase from 'camelcase'; import { FiltersType } from '../../lib/common/types/filter'; const nock = require('nock'); import fs from 'fs'; +import { CONFIGURATION } from '../../lib/common/types/options'; const scmRulesToTest = [ 'azure-repos', From cd8169a0e0e9ba0430c701aee6453c85fabcbe11 Mon Sep 17 00:00:00 2001 From: Antoine Arlaud Date: Mon, 11 Nov 2024 18:28:27 +0100 Subject: [PATCH 2/4] chore: splitting up broker workload from hybrid layer --- lib/{common => }/broker-workload/index.ts | 108 +++++++++++------ lib/broker-workload/request-filtering.ts | 26 +++++ lib/common/relay/forwardWebsocketRequest.ts | 2 +- lib/common/relay/responseSenders.ts | 110 ++++++++++++------ .../relay-response-body-universal.test.ts | 67 ++++++----- 5 files changed, 207 insertions(+), 106 deletions(-) rename lib/{common => }/broker-workload/index.ts (54%) create mode 100644 lib/broker-workload/request-filtering.ts diff --git a/lib/common/broker-workload/index.ts b/lib/broker-workload/index.ts similarity index 54% rename from lib/common/broker-workload/index.ts rename to lib/broker-workload/index.ts index eabb58521..77c742b7f 100644 --- a/lib/common/broker-workload/index.ts +++ b/lib/broker-workload/index.ts @@ -1,18 +1,22 @@ -import { runPreRequestPlugins } from '../../client/brokerClientPlugins/pluginManager'; -import { getFilterConfig } from '../../client/config/filters'; -import { prepareRequestFromFilterResult } from '../relay/prepareRequest'; -import { LOADEDFILTERSET } from '../types/filter'; -import { ExtendedLogContext } from '../types/log'; -import { computeContentLength } from '../utils/content-length'; -import { contentLengthHeader } from '../utils/headers-value-constants'; +import { runPreRequestPlugins } from '../client/brokerClientPlugins/pluginManager'; +import { prepareRequestFromFilterResult } from '../common/relay/prepareRequest'; +import { ExtendedLogContext } from '../common/types/log'; +import { computeContentLength } from '../common/utils/content-length'; +import { contentLengthHeader } from '../common/utils/headers-value-constants'; import { incrementWebSocketRequestsTotal, incrementHttpRequestsTotal, -} from '../utils/metrics'; -import { maskToken, hashToken } from '../utils/token'; -import { log as logger } from '../../logs/logger'; -import { HybridResponseHandler } from '../relay/responseSenders'; -import { getCorrelationDataFromHeaders } from '../utils/correlation-headers'; +} from '../common/utils/metrics'; +import { maskToken, hashToken } from '../common/utils/token'; +import { log as logger } from '../logs/logger'; +import { HybridResponseHandler } from '../common/relay/responseSenders'; +import { getCorrelationDataFromHeaders } from '../common/utils/correlation-headers'; +import { filterRequest } from './request-filtering'; +import { + makeRequestToDownstream, + makeStreamingRequestToDownstream, +} from '../common/http/request'; +import { logError } from '../logs/log'; export class brokerWorkload { options; @@ -58,12 +62,13 @@ export class brokerWorkload { 'Header Snyk-Request-Id not included in headers passed through', ); } + const responseHandler = new HybridResponseHandler( { connectionIdentifier: this.connectionIdentifier, payloadStreamingId: payload.streamingID, ...correlationHeaders, - overHttp: payload.headers['x-broker-ws-response'] ? false : true, + // overHttp: payload.headers['x-broker-ws-response'] ? false : true, }, this.websocketConnectionHandler, websocketResponseHandler, @@ -79,32 +84,18 @@ export class brokerWorkload { }`, ); - let filterResponse; - if ( - this.options.config.brokerType == 'client' && - this.options.config.universalBrokerEnabled - ) { - const loadedFilters = getFilterConfig().loadedFilters as Map< - string, - LOADEDFILTERSET - >; - filterResponse = - loadedFilters - .get(this.websocketConnectionHandler.supportedIntegrationType) - ?.private(payload) || false; - } else if (this.options.config.brokerType == 'client') { - const loadedFilters = getFilterConfig().loadedFilters as LOADEDFILTERSET; - filterResponse = loadedFilters.private(payload); - } else { - const loadedFilters = this.options.loadedFilters as LOADEDFILTERSET; - filterResponse = loadedFilters.private(payload); - } + const filterResponse = filterRequest( + payload, + this.options, + this.websocketConnectionHandler, + ); if (!filterResponse) { incrementWebSocketRequestsTotal(true, 'inbound-request'); const reason = '[Websocket Flow][Blocked Request] Does not match any accept rule'; logContext.error = 'Blocked by filter rules'; logger.warn(logContext, reason); + // TODO: need to type the response object return responseHandler.sendResponse({ status: 401, body: { @@ -131,12 +122,53 @@ export class brokerWorkload { ); payload.headers[contentLengthHeader] = computeContentLength(payload); } + if (this.options.config.brokerType !== 'client') { + preparedRequest.req.headers['x-snyk-broker'] = `${maskToken( + this.connectionIdentifier, + )}`; + } incrementHttpRequestsTotal(false, 'outbound-request'); - await responseHandler.sendDataResponse( - payload.streamingID, - preparedRequest.req, - logContext, - ); + + if (payload.streamingID) { + // indicates server supports streaming + try { + const downstreamRequestIncomingResponse = + await makeStreamingRequestToDownstream(preparedRequest.req); + responseHandler.streamDataResponse(downstreamRequestIncomingResponse); + } catch (e) { + logger.error( + { + ...logContext, + error: e, + stackTrace: new Error('stacktrace generator').stack, + }, + '[Downstream] Caught error making streaming request to downstream ', + ); + } + } else { + // here if request against server had header x-broker-ws-response:true + try { + const response = await makeRequestToDownstream(preparedRequest.req); + const status = (response && response.statusCode) || 500; + if (status > 404) { + logger.warn( + { + statusCode: response.statusCode, + responseHeaders: response.headers, + url: preparedRequest.req.url, + }, + `[Websocket Flow][Inbound] Unexpected status code for relayed request`, + ); + } + responseHandler.sendDataResponse(response, logContext); + } catch (error) { + logError(logContext, error); + return responseHandler.sendResponse({ + status: 500, + body: error, + }); + } + } } } } diff --git a/lib/broker-workload/request-filtering.ts b/lib/broker-workload/request-filtering.ts new file mode 100644 index 000000000..ede07b60c --- /dev/null +++ b/lib/broker-workload/request-filtering.ts @@ -0,0 +1,26 @@ +import { getFilterConfig } from '../client/config/filters'; +import { LOADEDFILTERSET } from '../common/types/filter'; + +export const filterRequest = (payload, options, websocketConnectionHandler) => { + let filterResponse; + if ( + options.config.brokerType == 'client' && + options.config.universalBrokerEnabled + ) { + const loadedFilters = getFilterConfig().loadedFilters as Map< + string, + LOADEDFILTERSET + >; + filterResponse = + loadedFilters + .get(websocketConnectionHandler.supportedIntegrationType) + ?.private(payload) || false; + } else if (options.config.brokerType == 'client') { + const loadedFilters = getFilterConfig().loadedFilters as LOADEDFILTERSET; + filterResponse = loadedFilters.private(payload); + } else { + const loadedFilters = options.loadedFilters as LOADEDFILTERSET; + filterResponse = loadedFilters.private(payload); + } + return filterResponse; +}; diff --git a/lib/common/relay/forwardWebsocketRequest.ts b/lib/common/relay/forwardWebsocketRequest.ts index 9820ba9ab..aa10b97e5 100644 --- a/lib/common/relay/forwardWebsocketRequest.ts +++ b/lib/common/relay/forwardWebsocketRequest.ts @@ -1,7 +1,7 @@ import { RequestPayload } from '../types/http'; import { WebSocketConnection } from '../../client/types/client'; import { LoadedClientOpts, LoadedServerOpts } from '../types/options'; -import { brokerWorkload } from '../broker-workload'; +import { brokerWorkload } from '../../broker-workload'; export const forwardWebSocketRequest = ( options: LoadedClientOpts | LoadedServerOpts, diff --git a/lib/common/relay/responseSenders.ts b/lib/common/relay/responseSenders.ts index 124e0ffa0..3c580a9b0 100644 --- a/lib/common/relay/responseSenders.ts +++ b/lib/common/relay/responseSenders.ts @@ -1,19 +1,26 @@ import { getConfig } from '../config/config'; import { BrokerServerPostResponseHandler } from '../http/downstream-post-stream-to-server'; -import { - legacyStreaming, - makeLegacyRequest, - makePostStreamingRequest, -} from './requestsHelper'; +import { legacyStreaming } from './requestsHelper'; import { log as logger } from '../../logs/logger'; import { CorrelationHeaders } from '../utils/correlation-headers'; +import { IncomingMessage } from 'node:http'; +import { logError, logResponse } from '../../logs/log'; +import { isJson } from '../utils/json'; +import { replaceUrlPartialChunk } from '../utils/replace-vars'; export type RequestMetadata = { connectionIdentifier: string; payloadStreamingId: string; - overHttp: boolean; + // streamResponse: boolean; } & CorrelationHeaders; +export interface HybridResponse { + status: number; + body?: any; + headers?: any; + errorType?: any; + originalBodySize?: any; +} export class HybridResponseHandler { connectionIdentifier; websocketConnectionHandler; @@ -34,46 +41,76 @@ export class HybridResponseHandler { this.connectionIdentifier = requestMetadata.connectionIdentifier; this.websocketConnectionHandler = websocketConnectionHandler; this.requestMetadata = requestMetadata; + // WebsocketResponseHandler provided means WS response expected + // header x-broker-ws-response:true used on server side if ( this.websocketConnectionHandler?.capabilities?.includes( 'receive-post-streams', ) && !this.websocketResponseHandler ) { - // Traffic over HTTP Post - this.responseHandler = this.postOverrideEmit; + // Response Traffic over HTTP Post + this.responseHandler = this.postDataResponseHandler; } else { - this.responseHandler = this.legacyOverrideEmit; + // Response Traffic over WS + this.responseHandler = this.websocketDataResponseHandler; } } - sendResponse = (payload) => { - this.responseHandler(payload); + // POST Data back without streaming + sendResponse = (payload: HybridResponse) => { + this.responseHandler(payload, false); }; - sendDataResponseInternal = (payload) => { - this.responseHandler(payload, this.requestMetadata.overHttp); + + // POST Data back with streaming + streamDataResponse = (payload: IncomingMessage) => { + this.responseHandler(payload, true); }; - sendDataResponse = async ( - payloadStreamingId, - preparedRequest, - logContext, - ) => { - payloadStreamingId - ? await makePostStreamingRequest( - preparedRequest, - this.sendDataResponseInternal, - logContext, - ) - : await makeLegacyRequest( - preparedRequest, - this.sendResponse, - logContext, - this.options, - ); + sendDataResponse = (response, logContext) => { + const contentLength = response.body.length; + // Note that the other side of the request will also check the length and will also reject it if it's too large + // Set to 20MB even though the server is 21MB because the server looks at the total data travelling through the websocket, + // not just the size of the body, so allow 1MB for miscellaneous data (e.g., headers, Primus overhead) + + const maxLength = + parseInt(this.options.socketMaxResponseLength) || 20971520; + if (contentLength && contentLength > maxLength) { + const errorMessage = `body size of ${contentLength} is greater than max allowed of ${maxLength} bytes`; + logError(logContext, { + errorMessage, + }); + return this.sendResponse({ + status: 502, + errorType: 'BODY_TOO_LARGE', + originalBodySize: contentLength, + body: { + message: errorMessage, + }, + }); + } + + if (this.options.RES_BODY_URL_SUB && isJson(response.headers)) { + const replaced = replaceUrlPartialChunk( + response.body, + null, + this.options, + ); + response.body = replaced.newChunk; + } + const status = (response && response.statusCode) || 500; + logResponse(logContext, status, response, this.options); + this.sendResponse({ + status, + body: response.body, + headers: response.headers, + }); }; - private postOverrideEmit = (responseData, replyOverHttp = false) => { + private postDataResponseHandler = ( + responseData, + streamingRequestData = false, + ) => { try { this.logContext.requestMethod = ''; this.logContext.requestHeaders = {}; @@ -87,7 +124,8 @@ export class HybridResponseHandler { this.requestMetadata.requestId, this.websocketConnectionHandler.role, ); - if (replyOverHttp) { + if (streamingRequestData) { + // POST Streaming logger.debug( this.logContext, '[Websocket Flow] Posting HTTP streaming response back to Broker Server', @@ -97,6 +135,7 @@ export class HybridResponseHandler { this.requestMetadata.payloadStreamingId, ); } else { + // POST Non Streaming logger.debug( this.logContext, '[Websocket Flow] Posting HTTP response back to Broker Server', @@ -112,7 +151,10 @@ export class HybridResponseHandler { } }; - private legacyOverrideEmit = (responseData, streamOverWebsocket = false) => { + private websocketDataResponseHandler = ( + responseData, + streamOverWebsocket = false, + ) => { if (responseData) { responseData['headers'] = responseData['headers'] ?? {}; responseData.headers['snyk-request-id'] = this.requestMetadata.requestId; @@ -122,6 +164,7 @@ export class HybridResponseHandler { // Traffic over websockets if (this.requestMetadata.payloadStreamingId) { + // Most likely not in use today if (streamOverWebsocket) { legacyStreaming( this.logContext, @@ -149,6 +192,7 @@ export class HybridResponseHandler { ); } } else { + // Used if x-broker-ws-response:true header on server side logger.debug( { ...this.logContext, responseData }, '[Websocket Flow] (Legacy) Sending fixed response over WebSocket connection', diff --git a/test/unit/relay-response-body-universal.test.ts b/test/unit/relay-response-body-universal.test.ts index aac051ca7..a50aa76b2 100644 --- a/test/unit/relay-response-body-universal.test.ts +++ b/test/unit/relay-response-body-universal.test.ts @@ -1,21 +1,10 @@ const PORT = 8001; process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; -jest.mock('../../lib/common/relay/requestsHelper'); import { Role, WebSocketConnection } from '../../lib/client/types/client'; import { loadBrokerConfig } from '../../lib/common/config/config'; import { loadAllFilters } from '../../lib/common/filter/filtersAsync'; -import { makeLegacyRequest } from '../../lib/common/relay/requestsHelper'; - -// eslint-disable-next-line @typescript-eslint/ban-ts-comment -// @ts-ignore -// eslint-disable-next-line @typescript-eslint/no-unused-vars -const mockedFn = makeLegacyRequest.mockImplementation((data, emit) => { - emit(); - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - return data; -}); +const nock = require('nock'); import { forwardWebSocketRequest as relay } from '../../lib/common/relay/forwardWebsocketRequest'; import { @@ -59,6 +48,7 @@ dummyLoadedFilters['github'] = { { method: 'any', url: '/*', + origin: 'http://test', }, ], public: [ @@ -70,16 +60,25 @@ dummyLoadedFilters['github'] = { }; describe('body relay', () => { + beforeAll(() => { + nock(`http://test`) + .persist() + .post(/./) + .reply((_url, body) => { + const response = body; + return [200, response]; + }); + }); beforeEach(() => { - jest.clearAllMocks(); + // jest.clearAllMocks(); }); afterAll(() => { delete process.env.BROKER_SERVER_URL; - jest.clearAllMocks(); + // jest.clearAllMocks(); }); - it('relay swaps body values found in BROKER_VAR_SUB', () => { + it('relay swaps body values found in BROKER_VAR_SUB', async () => { expect.hasAssertions(); const brokerToken = 'test-broker'; @@ -132,27 +131,27 @@ describe('body relay', () => { BROKER_VAR_SUB: ['url'], url: '${HOST}:${PORT}/webhook', }; - route( + let response; + await route( { url: '/', method: 'POST', - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore + body: Buffer.from(JSON.stringify(body)), headers: {}, }, - () => { - expect(makeLegacyRequest).toHaveBeenCalledTimes(1); - const arg = mockedFn.mock.calls[0][0]; - const url = JSON.parse(arg.body).url; - expect(url).toEqual( - `http://${config.connections.myconn.HOST2}:${config.connections.myconn.PORT}/webhook`, - ); + (responseCallback) => { + response = responseCallback; }, ); + expect(response).toBeDefined(); + expect(response.status).toEqual(200); + expect(JSON.parse(response.body).url).toEqual( + `http://${config.connections.myconn.HOST2}:${config.connections.myconn.PORT}/webhook`, + ); }); - it('relay does NOT swaps body values found in BROKER_VAR_SUB if disable substition true', () => { + it('relay does NOT swaps body values found in BROKER_VAR_SUB if disable substition true', async () => { expect.hasAssertions(); const brokerToken = 'test-broker'; @@ -203,21 +202,21 @@ describe('body relay', () => { BROKER_VAR_SUB: ['url'], url: '${HOST}:${PORT}/webhook', }; - route( + let response; + await route( { url: '/', method: 'POST', - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore + body: Buffer.from(JSON.stringify(body)), headers: {}, }, - () => { - expect(makeLegacyRequest).toHaveBeenCalledTimes(1); - const arg = mockedFn.mock.calls[0][0]; - const url = JSON.parse(arg.body).url; - expect(url).toEqual('${HOST}:${PORT}/webhook'); + (responseCallback) => { + response = responseCallback; }, ); + expect(response).toBeDefined(); + expect(response.status).toEqual(200); + expect(JSON.parse(response.body).url).toEqual('${HOST}:${PORT}/webhook'); }); }); From a4ae29f5cfd1ac7163883c74e79ea433e3f63ad0 Mon Sep 17 00:00:00 2001 From: Antoine Arlaud Date: Tue, 12 Nov 2024 13:50:52 +0100 Subject: [PATCH 3/4] chore: start separating sdk and workload --- lib/broker-workload/clientRequest.ts | 43 ++++ lib/broker-workload/index.ts | 6 +- ...quest-filtering.ts => requestFiltering.ts} | 28 +++ lib/client/auth/oauth.ts | 2 +- .../abstractBrokerPlugin.ts | 2 +- .../plugins/githubServerAppAuth.ts | 2 +- .../checks/config/brokerClientUrlCheck.ts | 2 +- .../checks/config/brokerClientVersionCheck.ts | 2 +- lib/client/checks/http/http-executor.ts | 2 +- lib/client/config/remoteConfig.ts | 2 +- lib/client/dispatcher/client/api.ts | 2 +- lib/client/utils/connectionValidation.ts | 2 +- lib/client/utils/credentials.ts | 2 +- lib/common/filter/utils.ts | 2 +- .../relay/LegacyStreamResponseHandler.ts | 9 +- lib/common/relay/forwardHttpRequest.ts | 225 +----------------- .../relay/forwardHttpRequestOverHttp.ts | 30 +-- lib/hybrid-sdk/clientRequestHelpers.ts | 202 ++++++++++++++++ lib/{common => hybrid-sdk}/http/axios.ts | 0 .../http/downstream-post-stream-to-server.ts | 6 +- .../http/patch-https-request-for-proxying.ts | 2 +- lib/{common => hybrid-sdk}/http/request.ts | 4 +- .../http/server-post-stream-handler.ts | 5 +- lib/{common => hybrid-sdk}/http/utils.ts | 0 .../relay => hybrid-sdk}/requestsHelper.ts | 12 +- .../relay => hybrid-sdk}/responseSenders.ts | 20 +- lib/hybrid-sdk/types.ts | 7 + lib/server/infra/dispatcher.ts | 2 +- .../routesHandlers/connectionStatusHandler.ts | 2 +- .../routesHandlers/httpRequestHandler.ts | 2 +- .../routesHandlers/postResponseHandler.ts | 3 +- test/unit/http.test.ts | 2 +- test/unit/proxying-decision.test.ts | 10 +- test/unit/relay-response-body-client.test.ts | 4 +- ...lay-response-body-form-url-encoded.test.ts | 4 +- ...se-body-universal-form-url-encoded.test.ts | 4 +- test/unit/relay-response-body.test.ts | 4 +- ...-response-headers-form-url-headers.test.ts | 4 +- ...headers-universal-form-url-headers.test.ts | 4 +- .../relay-response-headers-universal.test.ts | 4 +- test/unit/relay-response-headers.test.ts | 4 +- 41 files changed, 358 insertions(+), 316 deletions(-) create mode 100644 lib/broker-workload/clientRequest.ts rename lib/broker-workload/{request-filtering.ts => requestFiltering.ts} (51%) create mode 100644 lib/hybrid-sdk/clientRequestHelpers.ts rename lib/{common => hybrid-sdk}/http/axios.ts (100%) rename lib/{common => hybrid-sdk}/http/downstream-post-stream-to-server.ts (98%) rename lib/{common => hybrid-sdk}/http/patch-https-request-for-proxying.ts (97%) rename lib/{common => hybrid-sdk}/http/request.ts (98%) rename lib/{common => hybrid-sdk}/http/server-post-stream-handler.ts (93%) rename lib/{common => hybrid-sdk}/http/utils.ts (100%) rename lib/{common/relay => hybrid-sdk}/requestsHelper.ts (92%) rename lib/{common/relay => hybrid-sdk}/responseSenders.ts (91%) create mode 100644 lib/hybrid-sdk/types.ts diff --git a/lib/broker-workload/clientRequest.ts b/lib/broker-workload/clientRequest.ts new file mode 100644 index 000000000..28d38c3fd --- /dev/null +++ b/lib/broker-workload/clientRequest.ts @@ -0,0 +1,43 @@ +import { Request, Response } from 'express'; +import { HybridClientRequestHandler } from '../hybrid-sdk/clientRequestHelpers'; +import { incrementHttpRequestsTotal } from '../common/utils/metrics'; +import { filterClientRequest } from './requestFiltering'; +import { log as logger } from '../logs/logger'; + +export class BrokerClientRequestWorkload { + req: Request; + res: Response; + options; + constructor(req, res, options) { + this.req = req; + this.res = res; + this.options = options; + } + + async handler() { + const hybridClientRequestHandler = new HybridClientRequestHandler( + this.req, + this.res, + ); + const filterResponse = filterClientRequest( + this.req, + this.options, + this.res.locals.websocket, + ); + + if (!filterResponse) { + incrementHttpRequestsTotal(true, 'inbound-request'); + const reason = + 'Request does not match any accept rule, blocking HTTP request'; + hybridClientRequestHandler.logContext.error = 'blocked'; + logger.warn(hybridClientRequestHandler.logContext, reason); + // TODO: respect request headers, block according to content-type + return this.res + .status(401) + .send({ message: 'blocked', reason, url: this.req.url }); + } else { + hybridClientRequestHandler.makeRequest(filterResponse); + incrementHttpRequestsTotal(false, 'inbound-request'); + } + } +} diff --git a/lib/broker-workload/index.ts b/lib/broker-workload/index.ts index 77c742b7f..e186104b0 100644 --- a/lib/broker-workload/index.ts +++ b/lib/broker-workload/index.ts @@ -9,13 +9,13 @@ import { } from '../common/utils/metrics'; import { maskToken, hashToken } from '../common/utils/token'; import { log as logger } from '../logs/logger'; -import { HybridResponseHandler } from '../common/relay/responseSenders'; +import { HybridResponseHandler } from '../hybrid-sdk/responseSenders'; import { getCorrelationDataFromHeaders } from '../common/utils/correlation-headers'; -import { filterRequest } from './request-filtering'; +import { filterRequest } from './requestFiltering'; import { makeRequestToDownstream, makeStreamingRequestToDownstream, -} from '../common/http/request'; +} from '../hybrid-sdk/http/request'; import { logError } from '../logs/log'; export class brokerWorkload { diff --git a/lib/broker-workload/request-filtering.ts b/lib/broker-workload/requestFiltering.ts similarity index 51% rename from lib/broker-workload/request-filtering.ts rename to lib/broker-workload/requestFiltering.ts index ede07b60c..432d5703b 100644 --- a/lib/broker-workload/request-filtering.ts +++ b/lib/broker-workload/requestFiltering.ts @@ -24,3 +24,31 @@ export const filterRequest = (payload, options, websocketConnectionHandler) => { } return filterResponse; }; + +export const filterClientRequest = ( + payload, + options, + websocketConnectionHandler, +) => { + let filterResponse; + if ( + options.config.brokerType == 'client' && + options.config.universalBrokerEnabled + ) { + const loadedFilters = getFilterConfig().loadedFilters as Map< + string, + LOADEDFILTERSET + >; + filterResponse = + loadedFilters + .get(websocketConnectionHandler.supportedIntegrationType) // The chosen type is determined by websocket connect middlwr + ?.public(payload) || false; + } else if (options.config.brokerType == 'client') { + const loadedFilters = getFilterConfig().loadedFilters as LOADEDFILTERSET; + filterResponse = loadedFilters.public(payload); + } else { + const loadedFilters = options.loadedFilters as LOADEDFILTERSET; + filterResponse = loadedFilters.public(payload); + } + return filterResponse; +}; diff --git a/lib/client/auth/oauth.ts b/lib/client/auth/oauth.ts index df1379434..0a9cbd36f 100644 --- a/lib/client/auth/oauth.ts +++ b/lib/client/auth/oauth.ts @@ -1,4 +1,4 @@ -import { makeRequestToDownstream } from '../../common/http/request'; +import { makeRequestToDownstream } from '../../hybrid-sdk/http/request'; import { PostFilterPreparedRequest } from '../../common/relay/prepareRequest'; import { log as logger } from '../../logs/logger'; interface tokenExchangeResponse { diff --git a/lib/client/brokerClientPlugins/abstractBrokerPlugin.ts b/lib/client/brokerClientPlugins/abstractBrokerPlugin.ts index 95fd380d5..559e53204 100644 --- a/lib/client/brokerClientPlugins/abstractBrokerPlugin.ts +++ b/lib/client/brokerClientPlugins/abstractBrokerPlugin.ts @@ -2,7 +2,7 @@ import { getPluginsConfig } from '../../common/config/pluginsConfig'; import { HttpResponse, makeRequestToDownstream, -} from '../../common/http/request'; +} from '../../hybrid-sdk/http/request'; import { PostFilterPreparedRequest } from '../../common/relay/prepareRequest'; import { log as logger } from '../../logs/logger'; diff --git a/lib/client/brokerClientPlugins/plugins/githubServerAppAuth.ts b/lib/client/brokerClientPlugins/plugins/githubServerAppAuth.ts index 23a942131..a793a26e5 100644 --- a/lib/client/brokerClientPlugins/plugins/githubServerAppAuth.ts +++ b/lib/client/brokerClientPlugins/plugins/githubServerAppAuth.ts @@ -4,7 +4,7 @@ import BrokerPlugin from '../abstractBrokerPlugin'; import { createPrivateKey } from 'node:crypto'; import { sign } from 'jsonwebtoken'; import { PostFilterPreparedRequest } from '../../../common/relay/prepareRequest'; -import { makeRequestToDownstream } from '../../../common/http/request'; +import { makeRequestToDownstream } from '../../../hybrid-sdk/http/request'; import { maskSCMToken } from '../../../common/utils/token'; import { getPluginsConfig } from '../../../common/config/pluginsConfig'; import { replace } from '../../../common/utils/replace-vars'; diff --git a/lib/client/checks/config/brokerClientUrlCheck.ts b/lib/client/checks/config/brokerClientUrlCheck.ts index 8b5a8de43..d41a4d5aa 100644 --- a/lib/client/checks/config/brokerClientUrlCheck.ts +++ b/lib/client/checks/config/brokerClientUrlCheck.ts @@ -5,7 +5,7 @@ import { urlContainsProtocol } from '../../../common/utils/urlValidator'; import { HttpResponse, makeSingleRawRequestToDownstream, -} from '../../../common/http/request'; +} from '../../../hybrid-sdk/http/request'; import { retry } from '../../retry/exponential-backoff'; import version from '../../../common/utils/version'; diff --git a/lib/client/checks/config/brokerClientVersionCheck.ts b/lib/client/checks/config/brokerClientVersionCheck.ts index eacde1447..1d67f16cb 100644 --- a/lib/client/checks/config/brokerClientVersionCheck.ts +++ b/lib/client/checks/config/brokerClientVersionCheck.ts @@ -1,4 +1,4 @@ -import { makeSingleRawRequestToDownstream } from '../../../common/http/request'; +import { makeSingleRawRequestToDownstream } from '../../../hybrid-sdk/http/request'; import { PostFilterPreparedRequest } from '../../../common/relay/prepareRequest'; import version from '../../../common/utils/version'; import { CheckOptions, CheckResult } from '../types'; diff --git a/lib/client/checks/http/http-executor.ts b/lib/client/checks/http/http-executor.ts index 677dcf4f9..b3335282a 100644 --- a/lib/client/checks/http/http-executor.ts +++ b/lib/client/checks/http/http-executor.ts @@ -5,7 +5,7 @@ import type { CheckId, CheckResult, CheckStatus } from '../types'; import { HttpResponse, makeSingleRawRequestToDownstream, -} from '../../../common/http/request'; +} from '../../../hybrid-sdk/http/request'; export async function executeHttpRequest( checkOptions: { diff --git a/lib/client/config/remoteConfig.ts b/lib/client/config/remoteConfig.ts index 9189437dd..1d723ac95 100644 --- a/lib/client/config/remoteConfig.ts +++ b/lib/client/config/remoteConfig.ts @@ -1,5 +1,5 @@ import { readFileSync, writeFileSync } from 'fs'; -import { makeRequestToDownstream } from '../../common/http/request'; +import { makeRequestToDownstream } from '../../hybrid-sdk/http/request'; import { PostFilterPreparedRequest } from '../../common/relay/prepareRequest'; import { ClientOpts } from '../../common/types/options'; import { BrokerConnectionApiResponse } from '../types/api'; diff --git a/lib/client/dispatcher/client/api.ts b/lib/client/dispatcher/client/api.ts index 3120c107b..37784bd09 100644 --- a/lib/client/dispatcher/client/api.ts +++ b/lib/client/dispatcher/client/api.ts @@ -1,4 +1,4 @@ -import { makeRequestToDownstream } from '../../../common/http/request'; +import { makeRequestToDownstream } from '../../../hybrid-sdk/http/request'; import { PostFilterPreparedRequest } from '../../../common/relay/prepareRequest'; import { log as logger } from '../../../logs/logger'; import { diff --git a/lib/client/utils/connectionValidation.ts b/lib/client/utils/connectionValidation.ts index a2f9fcf8e..d7ae494e8 100644 --- a/lib/client/utils/connectionValidation.ts +++ b/lib/client/utils/connectionValidation.ts @@ -1,4 +1,4 @@ -import { makeSingleRawRequestToDownstream } from '../../common/http/request'; +import { makeSingleRawRequestToDownstream } from '../../hybrid-sdk/http/request'; import { PostFilterPreparedRequest } from '../../common/relay/prepareRequest'; import version from '../../common/utils/version'; import { ConnectionConfig } from '../types/config'; diff --git a/lib/client/utils/credentials.ts b/lib/client/utils/credentials.ts index aeb4f2502..43ba4ee71 100644 --- a/lib/client/utils/credentials.ts +++ b/lib/client/utils/credentials.ts @@ -1,7 +1,7 @@ import { log as logger } from '../../logs/logger'; import version from '../../common/utils/version'; import { sanitise } from '../../logs/logger'; -import { makeRequestToDownstream } from '../../common/http/request'; +import { makeRequestToDownstream } from '../../hybrid-sdk/http/request'; import { isJson } from '../../common/utils/json'; const credsFromHeader = (s) => { diff --git a/lib/common/filter/utils.ts b/lib/common/filter/utils.ts index 2753a4dc5..10156d938 100644 --- a/lib/common/filter/utils.ts +++ b/lib/common/filter/utils.ts @@ -1,6 +1,6 @@ import path from 'node:path'; import fs from 'fs'; -import { makeSingleRawRequestToDownstream } from '../http/request'; +import { makeSingleRawRequestToDownstream } from '../../hybrid-sdk/http/request'; import { PostFilterPreparedRequest } from '../relay/prepareRequest'; import version from '../utils/version'; import { findProjectRoot } from '../config/config'; diff --git a/lib/common/relay/LegacyStreamResponseHandler.ts b/lib/common/relay/LegacyStreamResponseHandler.ts index 457e01e8b..9e92902ab 100644 --- a/lib/common/relay/LegacyStreamResponseHandler.ts +++ b/lib/common/relay/LegacyStreamResponseHandler.ts @@ -1,10 +1,9 @@ -import { log as logger } from '../../logs/logger'; -import { observeResponseSize } from '../utils/metrics'; - import { - StreamResponse, streamsStore, -} from '../http/server-post-stream-handler'; + StreamResponse, +} from '../../hybrid-sdk/http/server-post-stream-handler'; +import { log as logger } from '../../logs/logger'; +import { observeResponseSize } from '../utils/metrics'; /** * @deprecated Deprecated in favour of {@link StreamResponseHandler} */ diff --git a/lib/common/relay/forwardHttpRequest.ts b/lib/common/relay/forwardHttpRequest.ts index 2cd10a5de..0b45fb5ec 100644 --- a/lib/common/relay/forwardHttpRequest.ts +++ b/lib/common/relay/forwardHttpRequest.ts @@ -1,22 +1,7 @@ import { Request, Response } from 'express'; -import undefsafe from 'undefsafe'; -import { v4 as uuid } from 'uuid'; - -import { log as logger } from '../../logs/logger'; -import stream from 'stream'; -import { - incrementHttpRequestsTotal, - incrementUnableToSizeResponse, - incrementWebSocketRequestsTotal, - observeResponseSize, -} from '../utils/metrics'; - -import { streamsStore } from '../http/server-post-stream-handler'; -import { ExtendedLogContext } from '../types/log'; import { LoadedClientOpts, LoadedServerOpts } from '../types/options'; -import { LOADEDFILTERSET } from '../types/filter'; -import { getFilterConfig } from '../../client/config/filters'; +import { BrokerClientRequestWorkload } from '../../broker-workload/clientRequest'; // 1. Request coming in over HTTP conn (logged) // 2. Filter for rule match (log and block if no match) @@ -26,210 +11,8 @@ import { getFilterConfig } from '../../client/config/filters'; export const forwardHttpRequest = ( options: LoadedClientOpts | LoadedServerOpts, ) => { - // const filters = loadFilters(filterRules); - - return (req: Request, res: Response) => { - // If this is the server, we should receive a Snyk-Request-Id header from upstream - // If this is the client, we will have to generate one - req.headers['snyk-request-id'] ||= uuid(); - const responseWantedOverWs = req.headers['x-broker-ws-response'] - ? true - : false; - const logContext: ExtendedLogContext = { - url: req.url, - requestMethod: req.method, - requestHeaders: req.headers, - requestId: - req.headers['snyk-request-id'] && - Array.isArray(req.headers['snyk-request-id']) - ? req.headers['snyk-request-id'].join(',') - : req.headers['snyk-request-id'] || '', - maskedToken: req['maskedToken'], - hashedToken: req['hashedToken'], - actingOrgPublicId: req.headers['snyk-acting-org-public-id'] as string, - actingGroupPublicId: req.headers['snyk-acting-group-public-id'] as string, - productLine: req.headers['snyk-product-line'] as string, - flow: req.headers['snyk-flow-name'] as string, - }; - - const simplifiedContext = logContext; - delete simplifiedContext.requestHeaders; - logger.info(simplifiedContext, '[HTTP Flow] Received request'); - let filterResponse; - if ( - options.config.brokerType == 'client' && - options.config.universalBrokerEnabled - ) { - const loadedFilters = getFilterConfig().loadedFilters as Map< - string, - LOADEDFILTERSET - >; - filterResponse = - loadedFilters - .get(res.locals.websocket.supportedIntegrationType) // The chosen type is determined by websocket connect middlwr - ?.public(req) || false; - } else if (options.config.brokerType == 'client') { - const loadedFilters = getFilterConfig().loadedFilters as LOADEDFILTERSET; - filterResponse = loadedFilters.public(req); - } else { - const loadedFilters = options.loadedFilters as LOADEDFILTERSET; - filterResponse = loadedFilters.public(req); - } - - const makeWebsocketRequestWithStreamingResponse = (result) => { - req.url = result.url; - logContext.ioUrl = result.url; - - const streamingID = uuid(); - const streamBuffer = new stream.PassThrough({ highWaterMark: 1048576 }); - streamBuffer.on('error', (error) => { - // This may be a duplicate error, as the most likely cause of this is the POST handler calling destroy. - logger.error( - { - ...logContext, - error, - stackTrace: new Error('stacktrace generator').stack, - }, - '[HTTP Flow][Relay] Error piping POST response stream through to HTTP response', - ); - res.destroy(error); - }); - logContext.streamingID = streamingID; - logger.debug( - logContext, - '[HTTP Flow][Relay] Sending request over websocket connection expecting POST stream response', - ); - - streamsStore.set(streamingID, { - response: res, - streamBuffer, - streamSize: 0, - }); - streamBuffer.pipe(res); - const simplifiedContextWithStreamingID = simplifiedContext; - simplifiedContextWithStreamingID['streamingID'] = streamingID; - logger.info( - simplifiedContextWithStreamingID, - '[HTTP Flow] Brokering request through WS', - ); - res.locals.websocket.send('request', { - url: req.url, - method: req.method, - body: req.body, - headers: req.headers, - streamingID, - }); - incrementWebSocketRequestsTotal(false, 'outbound-request'); - return; - }; - const makeWebsocketRequestWithWebsocketResponse = (result) => { - req.url = result.url; - logContext.ioUrl = result.url; - logger.debug( - logContext, - '[HTTP Flow][Relay] Sending request over websocket connection expecting Websocket response', - ); - - logger.info( - simplifiedContext, - '[HTTP Flow] Brokering request through WS', - ); - // relay the http request over the websocket, handle websocket response - res.locals.websocket.send( - 'request', - { - url: req.url, - method: req.method, - body: req.body, - headers: req.headers, - streamingID: '', - }, - (ioResponse) => { - logContext.responseStatus = ioResponse.status; - logContext.responseHeaders = ioResponse.headers; - logContext.responseBodyType = typeof ioResponse.body; - - const logMsg = - '[HTTP Flow][Relay] Return response from Websocket back to HTTP connection'; - if (ioResponse.status <= 299) { - logger.debug(logContext, logMsg); - let responseBodyString = ''; - if (typeof ioResponse.body === 'string') { - responseBodyString = ioResponse.body; - } else if (typeof ioResponse.body === 'object') { - responseBodyString = JSON.stringify(ioResponse.body); - } - if (responseBodyString) { - const responseBodyBytes = Buffer.byteLength( - responseBodyString, - 'utf-8', - ); - observeResponseSize({ - bytes: responseBodyBytes, - isStreaming: false, - }); - } else { - // fallback metric to let us know if we're recording all response sizes - // we expect to remove this should it report 0 - incrementUnableToSizeResponse(); - } - } else { - const errorLogMsg = - '[HTTP Flow][Relay] Non 2xx response from Websocket back to HTTP connection'; - logContext.ioErrorType = ioResponse.errorType; - logContext.ioOriginalBodySize = ioResponse.originalBodySize; - logger.warn(logContext, errorLogMsg); - } - - const httpResponse = res - .status(ioResponse.status) - .set(ioResponse.headers); - - const encodingType = undefsafe( - ioResponse, - 'headers.transfer-encoding', - ); - try { - // keep chunked http requests without content-length header - if (encodingType === 'chunked') { - httpResponse.write(ioResponse.body); - httpResponse.end(); - } else { - httpResponse.send(ioResponse.body); - } - } catch (err) { - logger.error( - { - ...logContext, - encodingType, - err, - stackTrace: new Error('stacktrace generator').stack, - }, - '[HTTP Flow][Relay] Error forwarding response from Web Socket to HTTP connection', - ); - } - }, - ); - incrementWebSocketRequestsTotal(false, 'outbound-request'); - }; - if (!filterResponse) { - incrementHttpRequestsTotal(true, 'inbound-request'); - const reason = - 'Request does not match any accept rule, blocking HTTP request'; - logContext.error = 'blocked'; - logger.warn(logContext, reason); - // TODO: respect request headers, block according to content-type - return res.status(401).send({ message: 'blocked', reason, url: req.url }); - } else { - incrementHttpRequestsTotal(false, 'inbound-request'); - if ( - res?.locals?.capabilities?.includes('post-streams') && - !responseWantedOverWs - ) { - makeWebsocketRequestWithStreamingResponse(filterResponse); - } else { - makeWebsocketRequestWithWebsocketResponse(filterResponse); - } - } + return async (req: Request, res: Response) => { + const workload = new BrokerClientRequestWorkload(req, res, options); + await workload.handler(); }; }; diff --git a/lib/common/relay/forwardHttpRequestOverHttp.ts b/lib/common/relay/forwardHttpRequestOverHttp.ts index e5c999918..a07eebbe4 100644 --- a/lib/common/relay/forwardHttpRequestOverHttp.ts +++ b/lib/common/relay/forwardHttpRequestOverHttp.ts @@ -5,11 +5,10 @@ import { log as logger } from '../../logs/logger'; import { incrementHttpRequestsTotal } from '../utils/metrics'; import { ExtendedLogContext } from '../types/log'; -import { makeRequestToDownstream } from '../http/request'; +import { makeRequestToDownstream } from '../../hybrid-sdk/http/request'; import { maskToken } from '../utils/token'; import { LoadedClientOpts, LoadedServerOpts } from '../types/options'; -import { LOADEDFILTERSET } from '../types/filter'; -import { getFilterConfig } from '../../client/config/filters'; +import { filterClientRequest } from '../../broker-workload/requestFiltering'; // 1. Request coming in over HTTP conn (logged) // 2. Filter for rule match (log and block if no match) @@ -42,26 +41,11 @@ export const forwardHttpRequestOverHttp = ( const simplifiedContext = logContext; delete simplifiedContext.requestHeaders; logger.info(simplifiedContext, '[HTTP Flow] Received request'); - let filterResponse; - if ( - options.config.brokerType == 'client' && - options.config.universalBrokerEnabled - ) { - const loadedFilters = getFilterConfig().loadedFilters as Map< - string, - LOADEDFILTERSET - >; - filterResponse = - loadedFilters - .get(res.locals.websocket.supportedIntegrationType) // The chosen type is determined by websocket connect middlwr - ?.public(req) || false; - } else if (options.config.brokerType == 'client') { - const loadedFilters = getFilterConfig().loadedFilters as LOADEDFILTERSET; - filterResponse = loadedFilters.public(req); - } else { - const loadedFilters = options.loadedFilters as LOADEDFILTERSET; - filterResponse = loadedFilters.public(req); - } + const filterResponse = filterClientRequest( + req, + options, + res.locals.websocket, + ); if (!filterResponse) { incrementHttpRequestsTotal(true, 'inbound-request'); diff --git a/lib/hybrid-sdk/clientRequestHelpers.ts b/lib/hybrid-sdk/clientRequestHelpers.ts new file mode 100644 index 000000000..18596d03a --- /dev/null +++ b/lib/hybrid-sdk/clientRequestHelpers.ts @@ -0,0 +1,202 @@ +import { Request, Response } from 'express'; +import { log as logger } from '../logs/logger'; +import { getConfig } from '../common/config/config'; + +import { + incrementWebSocketRequestsTotal, + observeResponseSize, + incrementUnableToSizeResponse, +} from '../common/utils/metrics'; +import undefsafe from 'undefsafe'; +import { ExtendedLogContext } from '../common/types/log'; +import { v4 as uuid } from 'uuid'; +import stream from 'stream'; +import { streamsStore } from './http/server-post-stream-handler'; + +export class HybridClientRequestHandler { + logContext: ExtendedLogContext; + simplifiedContext: ExtendedLogContext; + options; + req: Request; + res: Response; + responseWantedOverWs: boolean; + + constructor(req: Request, res: Response) { + this.req = req; + this.res = res; + this.options = getConfig(); + + this.req.headers['snyk-request-id'] ||= uuid(); + this.responseWantedOverWs = req.headers['x-broker-ws-response'] + ? true + : false; + this.logContext = { + url: this.req.url, + requestMethod: this.req.method, + requestHeaders: this.req.headers, + requestId: + this.req.headers['snyk-request-id'] && + Array.isArray(this.req.headers['snyk-request-id']) + ? this.req.headers['snyk-request-id'].join(',') + : this.req.headers['snyk-request-id'] || '', + maskedToken: this.req['maskedToken'], + hashedToken: this.req['hashedToken'], + actingOrgPublicId: this.req.headers[ + 'snyk-acting-org-public-id' + ] as string, + actingGroupPublicId: this.req.headers[ + 'snyk-acting-group-public-id' + ] as string, + productLine: this.req.headers['snyk-product-line'] as string, + flow: this.req.headers['snyk-flow-name'] as string, + }; + + this.simplifiedContext = this.logContext; + delete this.simplifiedContext.requestHeaders; + logger.info(this.simplifiedContext, '[HTTP Flow] Received request'); + } + private makeWebsocketRequestWithStreamingResponse(result) { + this.req.url = result.url; + this.logContext.ioUrl = result.url; + + const streamingID = uuid(); + const streamBuffer = new stream.PassThrough({ highWaterMark: 1048576 }); + streamBuffer.on('error', (error) => { + // This may be a duplicate error, as the most likely cause of this is the POST handler calling destroy. + logger.error( + { + ...this.logContext, + error, + stackTrace: new Error('stacktrace generator').stack, + }, + '[HTTP Flow][Relay] Error piping POST response stream through to HTTP response', + ); + this.res.destroy(error); + }); + this.logContext.streamingID = streamingID; + logger.debug( + this.logContext, + '[HTTP Flow][Relay] Sending request over websocket connection expecting POST stream response', + ); + + streamsStore.set(streamingID, { + response: this.res, + streamBuffer, + streamSize: 0, + }); + streamBuffer.pipe(this.res); + const simplifiedContextWithStreamingID = this.simplifiedContext; + simplifiedContextWithStreamingID['streamingID'] = streamingID; + logger.info( + simplifiedContextWithStreamingID, + '[HTTP Flow] Brokering request through WS', + ); + this.res.locals.websocket.send('request', { + url: this.req.url, + method: this.req.method, + body: this.req.body, + headers: this.req.headers, + streamingID, + }); + incrementWebSocketRequestsTotal(false, 'outbound-request'); + return; + } + private makeWebsocketRequestWithWebsocketResponse(result) { + this.req.url = result.url; + this.logContext.ioUrl = result.url; + logger.debug( + this.logContext, + '[HTTP Flow][Relay] Sending request over websocket connection expecting Websocket response', + ); + + logger.info( + this.simplifiedContext, + '[HTTP Flow] Brokering request through WS', + ); + // relay the http request over the websocket, handle websocket response + this.res.locals.websocket.send( + 'request', + { + url: this.req.url, + method: this.req.method, + body: this.req.body, + headers: this.req.headers, + streamingID: '', + }, + (ioResponse) => { + this.logContext.responseStatus = ioResponse.status; + this.logContext.responseHeaders = ioResponse.headers; + this.logContext.responseBodyType = typeof ioResponse.body; + + const logMsg = + '[HTTP Flow][Relay] Return response from Websocket back to HTTP connection'; + if (ioResponse.status <= 299) { + logger.debug(this.logContext, logMsg); + let responseBodyString = ''; + if (typeof ioResponse.body === 'string') { + responseBodyString = ioResponse.body; + } else if (typeof ioResponse.body === 'object') { + responseBodyString = JSON.stringify(ioResponse.body); + } + if (responseBodyString) { + const responseBodyBytes = Buffer.byteLength( + responseBodyString, + 'utf-8', + ); + observeResponseSize({ + bytes: responseBodyBytes, + isStreaming: false, + }); + } else { + // fallback metric to let us know if we're recording all response sizes + // we expect to remove this should it report 0 + incrementUnableToSizeResponse(); + } + } else { + const errorLogMsg = + '[HTTP Flow][Relay] Non 2xx response from Websocket back to HTTP connection'; + this.logContext.ioErrorType = ioResponse.errorType; + this.logContext.ioOriginalBodySize = ioResponse.originalBodySize; + logger.warn(this.logContext, errorLogMsg); + } + + const httpResponse = this.res + .status(ioResponse.status) + .set(ioResponse.headers); + + const encodingType = undefsafe(ioResponse, 'headers.transfer-encoding'); + try { + // keep chunked http requests without content-length header + if (encodingType === 'chunked') { + httpResponse.write(ioResponse.body); + httpResponse.end(); + } else { + httpResponse.send(ioResponse.body); + } + } catch (err) { + logger.error( + { + ...this.logContext, + encodingType, + err, + stackTrace: new Error('stacktrace generator').stack, + }, + '[HTTP Flow][Relay] Error forwarding response from Web Socket to HTTP connection', + ); + } + }, + ); + incrementWebSocketRequestsTotal(false, 'outbound-request'); + } + + makeRequest(filterResponse) { + if ( + this.res?.locals?.capabilities?.includes('post-streams') && + !this.responseWantedOverWs + ) { + this.makeWebsocketRequestWithStreamingResponse(filterResponse); + } else { + this.makeWebsocketRequestWithWebsocketResponse(filterResponse); + } + } +} diff --git a/lib/common/http/axios.ts b/lib/hybrid-sdk/http/axios.ts similarity index 100% rename from lib/common/http/axios.ts rename to lib/hybrid-sdk/http/axios.ts diff --git a/lib/common/http/downstream-post-stream-to-server.ts b/lib/hybrid-sdk/http/downstream-post-stream-to-server.ts similarity index 98% rename from lib/common/http/downstream-post-stream-to-server.ts rename to lib/hybrid-sdk/http/downstream-post-stream-to-server.ts index a5d2dc418..780c87eb6 100644 --- a/lib/common/http/downstream-post-stream-to-server.ts +++ b/lib/hybrid-sdk/http/downstream-post-stream-to-server.ts @@ -1,14 +1,14 @@ import { log as logger } from '../../logs/logger'; import stream from 'stream'; import { pipeline } from 'stream/promises'; -import { replaceUrlPartialChunk } from '../utils/replace-vars'; -import version from '../utils/version'; +import { replaceUrlPartialChunk } from '../../common/utils/replace-vars'; +import version from '../../common/utils/version'; import { getProxyForUrl } from 'proxy-from-env'; import { bootstrap } from 'global-agent'; import https from 'https'; import http from 'http'; -import { getConfig } from '../config/config'; +import { getConfig } from '../../common/config/config'; const BROKER_CONTENT_TYPE = 'application/vnd.broker.stream+octet-stream'; diff --git a/lib/common/http/patch-https-request-for-proxying.ts b/lib/hybrid-sdk/http/patch-https-request-for-proxying.ts similarity index 97% rename from lib/common/http/patch-https-request-for-proxying.ts rename to lib/hybrid-sdk/http/patch-https-request-for-proxying.ts index 65e0ed6e0..e3e9d09e6 100644 --- a/lib/common/http/patch-https-request-for-proxying.ts +++ b/lib/hybrid-sdk/http/patch-https-request-for-proxying.ts @@ -8,8 +8,8 @@ import url from 'url'; import tunnel from 'tunnel'; import https from 'https'; +import { loadBrokerConfig, getConfig } from '../../common/config/config'; -import { getConfig, loadBrokerConfig } from '../config/config'; loadBrokerConfig(); const brokerServer = url.parse(getConfig().brokerServerUrl || ''); brokerServer.port = diff --git a/lib/common/http/request.ts b/lib/hybrid-sdk/http/request.ts similarity index 98% rename from lib/common/http/request.ts rename to lib/hybrid-sdk/http/request.ts index 2d8e55b6b..de6d94502 100644 --- a/lib/common/http/request.ts +++ b/lib/hybrid-sdk/http/request.ts @@ -3,8 +3,8 @@ import https from 'https'; import { getProxyForUrl } from 'proxy-from-env'; import { bootstrap } from 'global-agent'; import { log as logger } from '../../logs/logger'; -import { PostFilterPreparedRequest } from '../relay/prepareRequest'; -import { getConfig } from '../config/config'; +import { PostFilterPreparedRequest } from '../../common/relay/prepareRequest'; +import { getConfig } from '../../common/config/config'; import { switchToInsecure } from './utils'; export interface HttpResponse { headers: Object; diff --git a/lib/common/http/server-post-stream-handler.ts b/lib/hybrid-sdk/http/server-post-stream-handler.ts similarity index 93% rename from lib/common/http/server-post-stream-handler.ts rename to lib/hybrid-sdk/http/server-post-stream-handler.ts index e50731a82..397c86ee9 100644 --- a/lib/common/http/server-post-stream-handler.ts +++ b/lib/hybrid-sdk/http/server-post-stream-handler.ts @@ -1,8 +1,9 @@ import stream from 'stream'; -import { observeResponseSize } from '../utils/metrics'; + import { Response } from 'express'; import NodeCache from 'node-cache'; -import { getConfig } from '../config/config'; +import { getConfig } from '../../common/config/config'; +import { observeResponseSize } from '../../common/utils/metrics'; export const streamsStore = new NodeCache({ stdTTL: parseInt(getConfig().cacheExpiry) || 3600, // 1 hour diff --git a/lib/common/http/utils.ts b/lib/hybrid-sdk/http/utils.ts similarity index 100% rename from lib/common/http/utils.ts rename to lib/hybrid-sdk/http/utils.ts diff --git a/lib/common/relay/requestsHelper.ts b/lib/hybrid-sdk/requestsHelper.ts similarity index 92% rename from lib/common/relay/requestsHelper.ts rename to lib/hybrid-sdk/requestsHelper.ts index 142357ff8..a29f1330a 100644 --- a/lib/common/relay/requestsHelper.ts +++ b/lib/hybrid-sdk/requestsHelper.ts @@ -1,12 +1,12 @@ import { makeRequestToDownstream, makeStreamingRequestToDownstream, -} from '../http/request'; -import { PostFilterPreparedRequest } from './prepareRequest'; -import { log as logger } from '../../logs/logger'; -import { logError, logResponse } from '../../logs/log'; -import { isJson } from '../utils/json'; -import { replaceUrlPartialChunk } from '../utils/replace-vars'; +} from './http/request'; +import { PostFilterPreparedRequest } from '../common/relay/prepareRequest'; +import { log as logger } from '../logs/logger'; +import { logError, logResponse } from '../logs/log'; +import { isJson } from '../common/utils/json'; +import { replaceUrlPartialChunk } from '../common/utils/replace-vars'; export const makePostStreamingRequest = async ( req: PostFilterPreparedRequest, diff --git a/lib/common/relay/responseSenders.ts b/lib/hybrid-sdk/responseSenders.ts similarity index 91% rename from lib/common/relay/responseSenders.ts rename to lib/hybrid-sdk/responseSenders.ts index 3c580a9b0..23ac6a8e3 100644 --- a/lib/common/relay/responseSenders.ts +++ b/lib/hybrid-sdk/responseSenders.ts @@ -1,18 +1,12 @@ -import { getConfig } from '../config/config'; -import { BrokerServerPostResponseHandler } from '../http/downstream-post-stream-to-server'; +import { getConfig } from '../common/config/config'; +import { BrokerServerPostResponseHandler } from './http/downstream-post-stream-to-server'; import { legacyStreaming } from './requestsHelper'; -import { log as logger } from '../../logs/logger'; -import { CorrelationHeaders } from '../utils/correlation-headers'; +import { log as logger } from '../logs/logger'; import { IncomingMessage } from 'node:http'; -import { logError, logResponse } from '../../logs/log'; -import { isJson } from '../utils/json'; -import { replaceUrlPartialChunk } from '../utils/replace-vars'; - -export type RequestMetadata = { - connectionIdentifier: string; - payloadStreamingId: string; - // streamResponse: boolean; -} & CorrelationHeaders; +import { logError, logResponse } from '../logs/log'; +import { isJson } from '../common/utils/json'; +import { replaceUrlPartialChunk } from '../common/utils/replace-vars'; +import { RequestMetadata } from './types'; export interface HybridResponse { status: number; diff --git a/lib/hybrid-sdk/types.ts b/lib/hybrid-sdk/types.ts new file mode 100644 index 000000000..138abf7bd --- /dev/null +++ b/lib/hybrid-sdk/types.ts @@ -0,0 +1,7 @@ +import { CorrelationHeaders } from '../common/utils/correlation-headers'; + +export type RequestMetadata = { + connectionIdentifier: string; + payloadStreamingId: string; + // streamResponse: boolean; +} & CorrelationHeaders; diff --git a/lib/server/infra/dispatcher.ts b/lib/server/infra/dispatcher.ts index 7d9080dc0..5114c7591 100644 --- a/lib/server/infra/dispatcher.ts +++ b/lib/server/infra/dispatcher.ts @@ -1,7 +1,7 @@ import { hashToken } from '../../common/utils/token'; import { log as logger } from '../../logs/logger'; import { getConfig } from '../../common/config/config'; -import { axiosInstance } from '../../common/http/axios'; +import { axiosInstance } from '../../hybrid-sdk/http/axios'; import { v4 as uuid } from 'uuid'; class DispatcherClient { diff --git a/lib/server/routesHandlers/connectionStatusHandler.ts b/lib/server/routesHandlers/connectionStatusHandler.ts index 756efee6a..50529bffa 100644 --- a/lib/server/routesHandlers/connectionStatusHandler.ts +++ b/lib/server/routesHandlers/connectionStatusHandler.ts @@ -4,7 +4,7 @@ import { getSocketConnections } from '../socket'; import { log as logger } from '../../logs/logger'; import { hostname } from 'node:os'; import { PostFilterPreparedRequest } from '../../common/relay/prepareRequest'; -import { makeStreamingRequestToDownstream } from '../../common/http/request'; +import { makeStreamingRequestToDownstream } from '../../hybrid-sdk/http/request'; export const connectionStatusHandler = async (req: Request, res: Response) => { const token = req.params.token; diff --git a/lib/server/routesHandlers/httpRequestHandler.ts b/lib/server/routesHandlers/httpRequestHandler.ts index 336aa58fd..699d53dc0 100644 --- a/lib/server/routesHandlers/httpRequestHandler.ts +++ b/lib/server/routesHandlers/httpRequestHandler.ts @@ -4,7 +4,7 @@ import { getDesensitizedToken } from '../utils/token'; import { getSocketConnections } from '../socket'; import { incrementHttpRequestsTotal } from '../../common/utils/metrics'; import { hostname } from 'node:os'; -import { makeStreamingRequestToDownstream } from '../../common/http/request'; +import { makeStreamingRequestToDownstream } from '../../hybrid-sdk/http/request'; import { PostFilterPreparedRequest } from '../../common/relay/prepareRequest'; import { URL, URLSearchParams } from 'node:url'; diff --git a/lib/server/routesHandlers/postResponseHandler.ts b/lib/server/routesHandlers/postResponseHandler.ts index 0b9500b9c..b6cc9684e 100644 --- a/lib/server/routesHandlers/postResponseHandler.ts +++ b/lib/server/routesHandlers/postResponseHandler.ts @@ -1,8 +1,9 @@ import { Request, Response } from 'express'; -import { StreamResponseHandler } from '../../common/http/server-post-stream-handler'; + import { log as logger } from '../../logs/logger'; import { getDesensitizedToken } from '../utils/token'; import { incrementHttpRequestsTotal } from '../../common/utils/metrics'; +import { StreamResponseHandler } from '../../hybrid-sdk/http/server-post-stream-handler'; export const handlePostResponse = (req: Request, res: Response) => { incrementHttpRequestsTotal(false, 'data-response'); diff --git a/test/unit/http.test.ts b/test/unit/http.test.ts index de1352035..bb4a8ee86 100644 --- a/test/unit/http.test.ts +++ b/test/unit/http.test.ts @@ -3,7 +3,7 @@ import { makeRequestToDownstream, makeSingleRawRequestToDownstream, makeStreamingRequestToDownstream, -} from '../../lib/common/http/request'; +} from '../../lib/hybrid-sdk/http/request'; import http from 'http'; const nock = require('nock'); diff --git a/test/unit/proxying-decision.test.ts b/test/unit/proxying-decision.test.ts index e4dd04a3b..83281d36a 100644 --- a/test/unit/proxying-decision.test.ts +++ b/test/unit/proxying-decision.test.ts @@ -18,7 +18,7 @@ describe('Proxy decision', () => { // loaded now, for config to be reloaded after env vars const { shouldProxy, - } = require('../../lib/common/http/patch-https-request-for-proxying'); + } = require('../../lib/hybrid-sdk/http/patch-https-request-for-proxying'); expect(shouldProxy(parse('https://broker.snyk.io'))).toEqual(false); }); @@ -29,7 +29,7 @@ describe('Proxy decision', () => { // loaded now, for config to be reloaded after env vars const { shouldProxy, - } = require('../../lib/common/http/patch-https-request-for-proxying'); + } = require('../../lib/hybrid-sdk/http/patch-https-request-for-proxying'); const url = parse('http://symbolics.com'); expect(shouldProxy(url)).toEqual(false); expect(shouldProxy(parse('https://shambhala.org/'))).toEqual(true); @@ -42,7 +42,7 @@ describe('Proxy decision', () => { // loaded now, for config to be reloaded after env vars const { shouldProxy, - } = require('../../lib/common/http/patch-https-request-for-proxying'); + } = require('../../lib/hybrid-sdk/http/patch-https-request-for-proxying'); const url = parse('http://symbolics.com'); expect(shouldProxy(url)).toEqual(false); expect(shouldProxy(parse('https://shambhala.org/'))).toEqual(true); @@ -55,7 +55,7 @@ describe('Proxy decision', () => { // loaded now, for config to be reloaded after env vars const { shouldProxy, - } = require('../../lib/common/http/patch-https-request-for-proxying'); + } = require('../../lib/hybrid-sdk/http/patch-https-request-for-proxying'); expect(shouldProxy(parse('http://symbolics.com/?hello'))).toEqual(false); }); @@ -66,7 +66,7 @@ describe('Proxy decision', () => { const { shouldProxy, - } = require('../../lib/common/http/patch-https-request-for-proxying'); + } = require('../../lib/hybrid-sdk/http/patch-https-request-for-proxying'); expect(shouldProxy(parse(`https://wiki.c2.com`))).toEqual(false); expect(shouldProxy(parse(`https://symbolics.com`))).toEqual(false); diff --git a/test/unit/relay-response-body-client.test.ts b/test/unit/relay-response-body-client.test.ts index fa2f8443b..ceae11eae 100644 --- a/test/unit/relay-response-body-client.test.ts +++ b/test/unit/relay-response-body-client.test.ts @@ -1,10 +1,10 @@ const PORT = 8001; process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; -jest.mock('../../lib/common/http/request'); +jest.mock('../../lib/hybrid-sdk/http/request'); import { setFilterConfig } from '../../lib/client/config/filters'; import { Role, WebSocketConnection } from '../../lib/client/types/client'; -import { makeRequestToDownstream } from '../../lib/common/http/request'; +import { makeRequestToDownstream } from '../../lib/hybrid-sdk/http/request'; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore diff --git a/test/unit/relay-response-body-form-url-encoded.test.ts b/test/unit/relay-response-body-form-url-encoded.test.ts index 9573e0a26..52ff312aa 100644 --- a/test/unit/relay-response-body-form-url-encoded.test.ts +++ b/test/unit/relay-response-body-form-url-encoded.test.ts @@ -1,9 +1,9 @@ const PORT = 8001; process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; -jest.mock('../../lib/common/http/request'); +jest.mock('../../lib/hybrid-sdk/http/request'); import { Role, WebSocketConnection } from '../../lib/client/types/client'; -import { makeRequestToDownstream } from '../../lib/common/http/request'; +import { makeRequestToDownstream } from '../../lib/hybrid-sdk/http/request'; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore diff --git a/test/unit/relay-response-body-universal-form-url-encoded.test.ts b/test/unit/relay-response-body-universal-form-url-encoded.test.ts index 1317b1034..04a24ca6c 100644 --- a/test/unit/relay-response-body-universal-form-url-encoded.test.ts +++ b/test/unit/relay-response-body-universal-form-url-encoded.test.ts @@ -1,9 +1,9 @@ const PORT = 8001; process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; -jest.mock('../../lib/common/http/request'); +jest.mock('../../lib/hybrid-sdk/http/request'); import { Role, WebSocketConnection } from '../../lib/client/types/client'; -import { makeRequestToDownstream } from '../../lib/common/http/request'; +import { makeRequestToDownstream } from '../../lib/hybrid-sdk/http/request'; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore diff --git a/test/unit/relay-response-body.test.ts b/test/unit/relay-response-body.test.ts index 8a04f0834..bbb7edfd0 100644 --- a/test/unit/relay-response-body.test.ts +++ b/test/unit/relay-response-body.test.ts @@ -1,9 +1,9 @@ const PORT = 8001; process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; -jest.mock('../../lib/common/http/request'); +jest.mock('../../lib/hybrid-sdk/http/request'); import { Role, WebSocketConnection } from '../../lib/client/types/client'; -import { makeRequestToDownstream } from '../../lib/common/http/request'; +import { makeRequestToDownstream } from '../../lib/hybrid-sdk/http/request'; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore diff --git a/test/unit/relay-response-headers-form-url-headers.test.ts b/test/unit/relay-response-headers-form-url-headers.test.ts index ae6d9c3d7..eb516a5a2 100644 --- a/test/unit/relay-response-headers-form-url-headers.test.ts +++ b/test/unit/relay-response-headers-form-url-headers.test.ts @@ -1,8 +1,8 @@ const PORT = 8001; process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; -jest.mock('../../lib/common/http/request'); +jest.mock('../../lib/hybrid-sdk/http/request'); import { Role, WebSocketConnection } from '../../lib/client/types/client'; -import { makeRequestToDownstream } from '../../lib/common/http/request'; +import { makeRequestToDownstream } from '../../lib/hybrid-sdk/http/request'; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore diff --git a/test/unit/relay-response-headers-universal-form-url-headers.test.ts b/test/unit/relay-response-headers-universal-form-url-headers.test.ts index 7faa8fa5b..c4aa9817a 100644 --- a/test/unit/relay-response-headers-universal-form-url-headers.test.ts +++ b/test/unit/relay-response-headers-universal-form-url-headers.test.ts @@ -1,8 +1,8 @@ const PORT = 8001; process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; -jest.mock('../../lib/common/http/request'); +jest.mock('../../lib/hybrid-sdk/http/request'); import { Role, WebSocketConnection } from '../../lib/client/types/client'; -import { makeRequestToDownstream } from '../../lib/common/http/request'; +import { makeRequestToDownstream } from '../../lib/hybrid-sdk/http/request'; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore diff --git a/test/unit/relay-response-headers-universal.test.ts b/test/unit/relay-response-headers-universal.test.ts index 4ed3f2ada..34be20a1a 100644 --- a/test/unit/relay-response-headers-universal.test.ts +++ b/test/unit/relay-response-headers-universal.test.ts @@ -1,8 +1,8 @@ const PORT = 8001; process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; -jest.mock('../../lib/common/http/request'); +jest.mock('../../lib/hybrid-sdk/http/request'); import { Role, WebSocketConnection } from '../../lib/client/types/client'; -import { makeRequestToDownstream } from '../../lib/common/http/request'; +import { makeRequestToDownstream } from '../../lib/hybrid-sdk/http/request'; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore diff --git a/test/unit/relay-response-headers.test.ts b/test/unit/relay-response-headers.test.ts index 2d175b66b..0f31a9843 100644 --- a/test/unit/relay-response-headers.test.ts +++ b/test/unit/relay-response-headers.test.ts @@ -1,8 +1,8 @@ const PORT = 8001; process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; -jest.mock('../../lib/common/http/request'); +jest.mock('../../lib/hybrid-sdk/http/request'); import { Role, WebSocketConnection } from '../../lib/client/types/client'; -import { makeRequestToDownstream } from '../../lib/common/http/request'; +import { makeRequestToDownstream } from '../../lib/hybrid-sdk/http/request'; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore From dd73a037d2338d5f3207a341e5254c8985db615b Mon Sep 17 00:00:00 2001 From: Antoine Arlaud Date: Tue, 12 Nov 2024 14:34:24 +0100 Subject: [PATCH 4/4] chore: consolidate fwd http request ws vs api fix: proper casing for Class name --- lib/broker-workload/clientRequest.ts | 7 +- lib/broker-workload/index.ts | 2 +- lib/client/index.ts | 6 +- lib/client/socketHandlers/chunkHandler.ts | 2 +- lib/common/relay/forwardHttpRequest.ts | 3 +- .../relay/forwardHttpRequestOverHttp.ts | 98 ------------------- lib/common/relay/forwardWebsocketRequest.ts | 4 +- .../LegacyStreamResponseHandler.ts | 6 +- lib/hybrid-sdk/clientRequestHelpers.ts | 60 +++++++++--- lib/hybrid-sdk/websocket/socket.ts | 0 lib/server/socketHandlers/identifyHandler.ts | 2 +- 11 files changed, 65 insertions(+), 125 deletions(-) delete mode 100644 lib/common/relay/forwardHttpRequestOverHttp.ts rename lib/{common/relay => hybrid-sdk}/LegacyStreamResponseHandler.ts (87%) create mode 100644 lib/hybrid-sdk/websocket/socket.ts diff --git a/lib/broker-workload/clientRequest.ts b/lib/broker-workload/clientRequest.ts index 28d38c3fd..5a263605e 100644 --- a/lib/broker-workload/clientRequest.ts +++ b/lib/broker-workload/clientRequest.ts @@ -14,7 +14,7 @@ export class BrokerClientRequestWorkload { this.options = options; } - async handler() { + async handler(makeRequestOverHttp = false) { const hybridClientRequestHandler = new HybridClientRequestHandler( this.req, this.res, @@ -36,7 +36,10 @@ export class BrokerClientRequestWorkload { .status(401) .send({ message: 'blocked', reason, url: this.req.url }); } else { - hybridClientRequestHandler.makeRequest(filterResponse); + hybridClientRequestHandler.makeRequest( + filterResponse, + makeRequestOverHttp, + ); incrementHttpRequestsTotal(false, 'inbound-request'); } } diff --git a/lib/broker-workload/index.ts b/lib/broker-workload/index.ts index e186104b0..f4c3478b9 100644 --- a/lib/broker-workload/index.ts +++ b/lib/broker-workload/index.ts @@ -18,7 +18,7 @@ import { } from '../hybrid-sdk/http/request'; import { logError } from '../logs/log'; -export class brokerWorkload { +export class BrokerWorkload { options; connectionIdentifier: string; websocketConnectionHandler; diff --git a/lib/client/index.ts b/lib/client/index.ts index eae614ed7..66f7aed7f 100644 --- a/lib/client/index.ts +++ b/lib/client/index.ts @@ -20,7 +20,6 @@ import { processStartUpHooks, validateMinimalConfig, } from './hooks/startup/processHooks'; -import { forwardHttpRequestOverHttp } from '../common/relay/forwardHttpRequestOverHttp'; import { isWebsocketConnOpen } from './utils/socketHelpers'; import { ClientOpts } from '../common/types/options'; import { websocketConnectionSelectorMiddleware } from './routesHandler/websocketConnectionMiddlewares'; @@ -126,10 +125,7 @@ export const main = async (clientOpts: ClientOpts) => { // start the local webserver to listen for relay requests const { app, server } = webserver(clientOpts.config, clientOpts.port); const httpToWsForwarder = forwardHttpRequest(clientOpts); - const httpToAPIForwarder = forwardHttpRequestOverHttp( - clientOpts, - clientOpts.config, - ); + const httpToAPIForwarder = forwardHttpRequest(clientOpts, true); // IMPORTANT: defined before relay (`app.all('/*', ...`) app.get('/health/checks', handleChecksRoute(clientOpts.config)); app.get('/health/checks/:checkId', handleCheckIdsRoutes(clientOpts.config)); diff --git a/lib/client/socketHandlers/chunkHandler.ts b/lib/client/socketHandlers/chunkHandler.ts index f2ab183de..823b2b2dd 100644 --- a/lib/client/socketHandlers/chunkHandler.ts +++ b/lib/client/socketHandlers/chunkHandler.ts @@ -1,4 +1,4 @@ -import { legacyStreamResponseHandler } from '../../common/relay/LegacyStreamResponseHandler'; +import { legacyStreamResponseHandler } from '../../hybrid-sdk/LegacyStreamResponseHandler'; export const chunkHandler = (connectionIdentifier) => { return legacyStreamResponseHandler(connectionIdentifier); diff --git a/lib/common/relay/forwardHttpRequest.ts b/lib/common/relay/forwardHttpRequest.ts index 0b45fb5ec..5159f9af7 100644 --- a/lib/common/relay/forwardHttpRequest.ts +++ b/lib/common/relay/forwardHttpRequest.ts @@ -10,9 +10,10 @@ import { BrokerClientRequestWorkload } from '../../broker-workload/clientRequest // 5. Send response over HTTP conn export const forwardHttpRequest = ( options: LoadedClientOpts | LoadedServerOpts, + makeHttpRequest = false, ) => { return async (req: Request, res: Response) => { const workload = new BrokerClientRequestWorkload(req, res, options); - await workload.handler(); + await workload.handler(makeHttpRequest); }; }; diff --git a/lib/common/relay/forwardHttpRequestOverHttp.ts b/lib/common/relay/forwardHttpRequestOverHttp.ts deleted file mode 100644 index a07eebbe4..000000000 --- a/lib/common/relay/forwardHttpRequestOverHttp.ts +++ /dev/null @@ -1,98 +0,0 @@ -import { Request, Response } from 'express'; -import { v4 as uuid } from 'uuid'; - -import { log as logger } from '../../logs/logger'; -import { incrementHttpRequestsTotal } from '../utils/metrics'; - -import { ExtendedLogContext } from '../types/log'; -import { makeRequestToDownstream } from '../../hybrid-sdk/http/request'; -import { maskToken } from '../utils/token'; -import { LoadedClientOpts, LoadedServerOpts } from '../types/options'; -import { filterClientRequest } from '../../broker-workload/requestFiltering'; - -// 1. Request coming in over HTTP conn (logged) -// 2. Filter for rule match (log and block if no match) -// 3. Relay over websocket conn (logged) -// 4. Get response over websocket conn (logged) -// 5. Send response over HTTP conn -export const forwardHttpRequestOverHttp = ( - options: LoadedClientOpts | LoadedServerOpts, - config, -) => { - // const filters = loadFilters(filterRules); - - return (req: Request, res: Response) => { - // If this is the server, we should receive a Snyk-Request-Id header from upstream - // If this is the client, we will have to generate one - req.headers['snyk-request-id'] ||= uuid(); - const logContext: ExtendedLogContext = { - url: req.url, - requestMethod: req.method, - requestHeaders: req.headers, - requestId: - req.headers['snyk-request-id'] && - Array.isArray(req.headers['snyk-request-id']) - ? req.headers['snyk-request-id'].join(',') - : req.headers['snyk-request-id'] || '', - maskedToken: req['maskedToken'], - hashedToken: req['hashedToken'], - }; - - const simplifiedContext = logContext; - delete simplifiedContext.requestHeaders; - logger.info(simplifiedContext, '[HTTP Flow] Received request'); - const filterResponse = filterClientRequest( - req, - options, - res.locals.websocket, - ); - - if (!filterResponse) { - incrementHttpRequestsTotal(true, 'inbound-request'); - const reason = - 'Request does not match any accept rule, blocking HTTP request'; - logContext.error = 'blocked'; - logger.warn(logContext, reason); - // TODO: respect request headers, block according to content-type - return res.status(401).send({ message: 'blocked', reason, url: req.url }); - } else { - incrementHttpRequestsTotal(false, 'inbound-request'); - - const apiDomain = new URL( - config.API_BASE_URL || - (config.BROKER_SERVER_URL - ? config.BROKER_SERVER_URL.replace('//broker.', '//api.') - : 'https://api.snyk.io'), - ); - - const requestUri = new URL(req.url, apiDomain); - req.headers['host'] = requestUri.host; - req.headers['x-snyk-broker'] = `${maskToken( - res.locals.websocket.identifier, // This should be coupled/replaced by deployment ID - )}`; - - const filteredReq = { - url: requestUri.toString(), - method: req.method, - body: req.body, - headers: req.headers, - }; - - makeRequestToDownstream(filteredReq) - .then((resp) => { - if (resp.statusCode) { - res.status(resp.statusCode).set(resp.headers).send(resp.body); - } else { - res.status(500).send(resp.statusText); - } - }) - .catch((err) => { - logger.error( - logContext, - err, - 'Failed to forward webhook event to Snyk Platform', - ); - }); - } - }; -}; diff --git a/lib/common/relay/forwardWebsocketRequest.ts b/lib/common/relay/forwardWebsocketRequest.ts index aa10b97e5..f21c40b55 100644 --- a/lib/common/relay/forwardWebsocketRequest.ts +++ b/lib/common/relay/forwardWebsocketRequest.ts @@ -1,7 +1,7 @@ import { RequestPayload } from '../types/http'; import { WebSocketConnection } from '../../client/types/client'; import { LoadedClientOpts, LoadedServerOpts } from '../types/options'; -import { brokerWorkload } from '../../broker-workload'; +import { BrokerWorkload } from '../../broker-workload'; export const forwardWebSocketRequest = ( options: LoadedClientOpts | LoadedServerOpts, @@ -14,7 +14,7 @@ export const forwardWebSocketRequest = ( // 5. Send response over websocket conn return (connectionIdentifier) => async (payload: RequestPayload, emit) => { - const workload = new brokerWorkload( + const workload = new BrokerWorkload( connectionIdentifier, options, websocketConnectionHandler, diff --git a/lib/common/relay/LegacyStreamResponseHandler.ts b/lib/hybrid-sdk/LegacyStreamResponseHandler.ts similarity index 87% rename from lib/common/relay/LegacyStreamResponseHandler.ts rename to lib/hybrid-sdk/LegacyStreamResponseHandler.ts index 9e92902ab..988766534 100644 --- a/lib/common/relay/LegacyStreamResponseHandler.ts +++ b/lib/hybrid-sdk/LegacyStreamResponseHandler.ts @@ -1,9 +1,9 @@ import { streamsStore, StreamResponse, -} from '../../hybrid-sdk/http/server-post-stream-handler'; -import { log as logger } from '../../logs/logger'; -import { observeResponseSize } from '../utils/metrics'; +} from './http/server-post-stream-handler'; +import { log as logger } from '../logs/logger'; +import { observeResponseSize } from '../common/utils/metrics'; /** * @deprecated Deprecated in favour of {@link StreamResponseHandler} */ diff --git a/lib/hybrid-sdk/clientRequestHelpers.ts b/lib/hybrid-sdk/clientRequestHelpers.ts index 18596d03a..e608a1f88 100644 --- a/lib/hybrid-sdk/clientRequestHelpers.ts +++ b/lib/hybrid-sdk/clientRequestHelpers.ts @@ -12,6 +12,8 @@ import { ExtendedLogContext } from '../common/types/log'; import { v4 as uuid } from 'uuid'; import stream from 'stream'; import { streamsStore } from './http/server-post-stream-handler'; +import { maskToken } from '../common/utils/token'; +import { makeRequestToDownstream } from './http/request'; export class HybridClientRequestHandler { logContext: ExtendedLogContext; @@ -55,10 +57,7 @@ export class HybridClientRequestHandler { delete this.simplifiedContext.requestHeaders; logger.info(this.simplifiedContext, '[HTTP Flow] Received request'); } - private makeWebsocketRequestWithStreamingResponse(result) { - this.req.url = result.url; - this.logContext.ioUrl = result.url; - + private makeWebsocketRequestWithStreamingResponse() { const streamingID = uuid(); const streamBuffer = new stream.PassThrough({ highWaterMark: 1048576 }); streamBuffer.on('error', (error) => { @@ -101,9 +100,7 @@ export class HybridClientRequestHandler { incrementWebSocketRequestsTotal(false, 'outbound-request'); return; } - private makeWebsocketRequestWithWebsocketResponse(result) { - this.req.url = result.url; - this.logContext.ioUrl = result.url; + private makeWebsocketRequestWithWebsocketResponse() { logger.debug( this.logContext, '[HTTP Flow][Relay] Sending request over websocket connection expecting Websocket response', @@ -188,15 +185,56 @@ export class HybridClientRequestHandler { ); incrementWebSocketRequestsTotal(false, 'outbound-request'); } + private makeHttpRequest() { + const apiDomain = new URL( + this.options.API_BASE_URL || + (this.options.BROKER_SERVER_URL + ? this.options.BROKER_SERVER_URL.replace('//broker.', '//api.') + : 'https://api.snyk.io'), + ); + + const requestUri = new URL(this.req.url, apiDomain); + this.req.headers['host'] = requestUri.host; + this.req.headers['x-snyk-broker'] = `${maskToken( + this.res.locals.websocket.identifier, // This should be coupled/replaced by deployment ID + )}`; + + const filteredReq = { + url: requestUri.toString(), + method: this.req.method, + body: this.req.body, + headers: this.req.headers, + }; + + makeRequestToDownstream(filteredReq) + .then((resp) => { + if (resp.statusCode) { + this.res.status(resp.statusCode).set(resp.headers).send(resp.body); + } else { + this.res.status(500).send(resp.statusText); + } + }) + .catch((err) => { + logger.error( + this.logContext, + err, + 'Failed to forward webhook event to Snyk Platform', + ); + }); + } - makeRequest(filterResponse) { - if ( + makeRequest(filterResponse, makeRequestOverHttp = false) { + this.req.url = filterResponse.url; + this.logContext.ioUrl = filterResponse.url; + if (makeRequestOverHttp) { + this.makeHttpRequest(); + } else if ( this.res?.locals?.capabilities?.includes('post-streams') && !this.responseWantedOverWs ) { - this.makeWebsocketRequestWithStreamingResponse(filterResponse); + this.makeWebsocketRequestWithStreamingResponse(); } else { - this.makeWebsocketRequestWithWebsocketResponse(filterResponse); + this.makeWebsocketRequestWithWebsocketResponse(); } } } diff --git a/lib/hybrid-sdk/websocket/socket.ts b/lib/hybrid-sdk/websocket/socket.ts new file mode 100644 index 000000000..e69de29bb diff --git a/lib/server/socketHandlers/identifyHandler.ts b/lib/server/socketHandlers/identifyHandler.ts index 782dc8d0e..d48332628 100644 --- a/lib/server/socketHandlers/identifyHandler.ts +++ b/lib/server/socketHandlers/identifyHandler.ts @@ -1,4 +1,4 @@ -import { legacyStreamResponseHandler } from '../../common/relay/LegacyStreamResponseHandler'; +import { legacyStreamResponseHandler } from '../../hybrid-sdk/LegacyStreamResponseHandler'; import { incrementSocketConnectionGauge } from '../../common/utils/metrics'; import { log as logger } from '../../logs/logger'; import { clientConnected, clientPinged } from '../infra/dispatcher';