diff --git a/lib/client/socket.ts b/lib/client/socket.ts index 6fce936ff..cf2e63052 100644 --- a/lib/client/socket.ts +++ b/lib/client/socket.ts @@ -21,6 +21,7 @@ import { maskToken } from '../common/utils/token'; import { fetchJwt } from './auth/oauth'; import { getServerId } from './dispatcher'; import { determineFilterType } from './utils/filterSelection'; +import { notificationHandler } from './socketHandlers/notificationHandler'; export const createWebSocketConnectionPairs = async ( websocketConnections: WebSocketConnection[], @@ -225,7 +226,7 @@ export const createWebSocket = ( : localClientOps.config.brokerToken, ), ); - + websocket.on('notification', notificationHandler); websocket.on('error', errorHandler); websocket.on('open', () => diff --git a/lib/client/socketHandlers/notificationHandler.ts b/lib/client/socketHandlers/notificationHandler.ts new file mode 100644 index 000000000..179898d41 --- /dev/null +++ b/lib/client/socketHandlers/notificationHandler.ts @@ -0,0 +1,15 @@ +import { log as logger } from '../../logs/logger'; + +export const notificationHandler = ({ level, message }) => { + switch (level) { + case 'error': + logger.error({ message }); + break; + case 'warning': + logger.warn({ message }); + break; + case 'info': + default: + logger.info({ message }); + } +}; diff --git a/lib/common/utils/version.ts b/lib/common/utils/version.ts index 810743c42..234ec34ac 100644 --- a/lib/common/utils/version.ts +++ b/lib/common/utils/version.ts @@ -1,2 +1,2 @@ import packageJson from '../../../package.json'; -export default packageJson['version'] || 'local'; +export default packageJson['version'] || process.env.BROKER_VERSION || 'local'; diff --git a/lib/server/socketHandlers/identifyHandler.ts b/lib/server/socketHandlers/identifyHandler.ts index 782dc8d0e..8f4f72cef 100644 --- a/lib/server/socketHandlers/identifyHandler.ts +++ b/lib/server/socketHandlers/identifyHandler.ts @@ -6,8 +6,13 @@ import { getSocketConnections } from '../socket'; import { metadataWithoutFilters } from '../utils/socket'; import { getDesensitizedToken } from '../utils/token'; import { getForwardWebSocketRequestHandler } from './initHandlers'; +import semver from 'semver'; let response; +const minimalSupportedBrokerVersion = + process.env.MINIMAL_SUPPORTED_BROKER_VERSION ?? '4.100.0'; +const minimalRecommendedBrokerVersion = + process.env.MINIMAL_RECOMMENDED_BROKER_VERSION ?? '4.182.0'; const streamingResponse = legacyStreamResponseHandler; export const initIdentifyHandler = () => { @@ -38,13 +43,27 @@ export const handleIdentifyOnSocket = (clientData, socket, token): boolean => { const { maskedToken, hashedToken } = getDesensitizedToken(token); const clientId = clientData.metadata.clientId; const clientVersion = clientData.metadata.version; - // TODO: If version < cutoff version, then alert first, then deny - // if(clientVersion < minimalVersion) { - // socket.send('error', { message: `Broker Client Version is outdated. Minimal version: ${minimalVersion}. Please upgrade to latest version` }); - // } - // if(clientVersion < minimalSupportedVersion) { - // socket.send('warning', { message: `Broker Client Version is outdated. Minimal version: ${minimalVersion}. Please upgrade to latest version` }); - // } + + if ( + clientVersion != 'local' && + semver.lt(clientVersion, minimalSupportedBrokerVersion) + ) { + socket.send('notification', { + level: 'error', + message: `Broker Client Version is outdated. Minimal version: ${minimalSupportedBrokerVersion}. Please upgrade to latest version`, + }); + socket.end(); + return false; + } + if ( + clientVersion != 'local' && + semver.lt(clientVersion, minimalRecommendedBrokerVersion) + ) { + socket.send('notification', { + level: 'warning', + message: `Broker Client Version is deprecated. Minimal version: ${minimalRecommendedBrokerVersion}. Please upgrade to latest version`, + }); + } logger.info( { diff --git a/package-lock.json b/package-lock.json index c08fd36db..f3f1475f7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -35,6 +35,7 @@ "prom-client": "^11.5.3", "proxy-from-env": "^1.1.0", "qs": "^6.13.0", + "semver": "^7.6.3", "snyk-config": "^4.0.0", "then-fs": "^2.0.0", "tunnel": "0.0.6", diff --git a/package.json b/package.json index 93a8c3ae5..a4fd304ff 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,7 @@ "dev:client": "tsc-watch --project tsconfig.json --onSuccess 'node dist/cli/index.js client' | ./node_modules/.bin/bunyan", "test": "npm run test:unit && npm run test:functional", "test:unit": "jest unit --detectOpenHandles", - "test:functional": "jest functional --detectOpenHandles", + "test:functional": "jest functional --detectOpenHandles --runInBand", "test:bin": "(cd test/bin; ./container-registry-agent/docker-entrypoint-test.sh)", "test:bin:docker": "docker run --rm -it -v $PWD:/home/broker -w /home/broker/test/bin/ snyk/ubuntu ./container-registry-agent/docker-entrypoint-test.sh", "lint": "npm run lint:check && npm run lint:code", @@ -88,6 +88,7 @@ "prom-client": "^11.5.3", "proxy-from-env": "^1.1.0", "qs": "^6.13.0", + "semver": "^7.6.3", "snyk-config": "^4.0.0", "then-fs": "^2.0.0", "tunnel": "0.0.6", diff --git a/test/functional/client-version-control-newer-client.test.ts b/test/functional/client-version-control-newer-client.test.ts new file mode 100644 index 000000000..56ad56d35 --- /dev/null +++ b/test/functional/client-version-control-newer-client.test.ts @@ -0,0 +1,91 @@ +process.env.BROKER_VERSION = '4.100.1'; +import path from 'path'; +import { + BrokerClient, + closeBrokerClient, + createBrokerClient, + waitForBrokerServerConnection, +} from '../setup/broker-client'; +import { + BrokerServer, + closeBrokerServer, + createBrokerServer, + // waitForBrokerClientConnection, + waitForBrokerClientConnections, +} from '../setup/broker-server'; + +import { TestWebServer, createTestWebServer } from '../setup/test-web-server'; + +import { axiosClient } from '../setup/axios-client'; +import { delay } from '../helpers/utils'; + +const fixtures = path.resolve(__dirname, '..', 'fixtures'); +const serverAccept = path.join(fixtures, 'server', 'filters.json'); +const clientAccept = path.join(fixtures, 'client', 'filters.json'); + +describe('newer broker version control', () => { + let tws: TestWebServer; + let bs: BrokerServer; + let bc: BrokerClient; + let brokerToken: string; + let serverMetadata: unknown; + + beforeAll(async () => { + const PORT = 9999; + process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; + + tws = await createTestWebServer(); + + bs = await createBrokerServer({ port: PORT, filters: serverAccept }); + + bc = await createBrokerClient({ + brokerServerUrl: `http://localhost:${bs.port}`, + brokerToken: 'broker-token-12345', + filters: clientAccept, + type: 'client', + }); + const connData = await waitForBrokerClientConnections(bs, 2); + const primaryIndex = connData.metadataArray[0]['role'] == 'primary' ? 0 : 1; + brokerToken = connData.brokerTokens[primaryIndex]; + serverMetadata = connData.metadataArray[primaryIndex]; + }); + + afterAll(async () => { + await tws.server.close(); + await closeBrokerClient(bc); + await closeBrokerServer(bs); + delete process.env.BROKER_SERVER_URL; + }); + + it('server accepts connection if version is newer than cutoff', async () => { + serverMetadata = await waitForBrokerServerConnection(bc); + + expect(brokerToken).toEqual('broker-token-12345'); + expect(serverMetadata).toMatchObject({ + capabilities: ['receive-post-streams'], + }); + await delay(100); + // expect(isWebsocketConnOpen(bs.server[0])).toBeFalsy() + const response = await axiosClient.get( + `http://localhost:${bc.port}/healthcheck`, + // { some: { example: 'json' } }, + ); + expect(response.status).toEqual(200); + expect(response.data).toStrictEqual([ + { + ok: true, + websocketConnectionOpen: true, + brokerServerUrl: 'http://localhost:9999/?connection_role=primary', + version: '4.100.1', + transport: expect.any(String), + }, + { + ok: true, + websocketConnectionOpen: true, + brokerServerUrl: 'http://localhost:9999/?connection_role=secondary', + version: '4.100.1', + transport: expect.any(String), + }, + ]); + }); +}); diff --git a/test/functional/client-version-control-older-client.test.ts b/test/functional/client-version-control-older-client.test.ts new file mode 100644 index 000000000..d3511dc60 --- /dev/null +++ b/test/functional/client-version-control-older-client.test.ts @@ -0,0 +1,91 @@ +process.env.BROKER_VERSION = '4.90.0'; +import path from 'path'; +import { + BrokerClient, + closeBrokerClient, + createBrokerClient, + waitForBrokerServerConnection, +} from '../setup/broker-client'; +import { + BrokerServer, + closeBrokerServer, + createBrokerServer, + // waitForBrokerClientConnection, + waitForBrokerClientConnections, +} from '../setup/broker-server'; + +import { TestWebServer, createTestWebServer } from '../setup/test-web-server'; + +import { axiosClient } from '../setup/axios-client'; +import { delay } from '../helpers/utils'; + +const fixtures = path.resolve(__dirname, '..', 'fixtures'); +const serverAccept = path.join(fixtures, 'server', 'filters.json'); +const clientAccept = path.join(fixtures, 'client', 'filters.json'); + +describe('older broker version control', () => { + let tws: TestWebServer; + let bs: BrokerServer; + let bc: BrokerClient; + let brokerToken: string; + let serverMetadata: unknown; + + beforeAll(async () => { + const PORT = 9999; + process.env.BROKER_SERVER_URL = `http://localhost:${PORT}`; + + tws = await createTestWebServer(); + + bs = await createBrokerServer({ port: PORT, filters: serverAccept }); + + bc = await createBrokerClient({ + brokerServerUrl: `http://localhost:${bs.port}`, + brokerToken: 'broker-token-12345', + filters: clientAccept, + type: 'client', + }); + const connData = await waitForBrokerClientConnections(bs, 2); + const primaryIndex = connData.metadataArray[0]['role'] == 'primary' ? 0 : 1; + brokerToken = connData.brokerTokens[primaryIndex]; + serverMetadata = connData.metadataArray[primaryIndex]; + }); + + afterAll(async () => { + await tws.server.close(); + await closeBrokerClient(bc); + await closeBrokerServer(bs); + delete process.env.BROKER_SERVER_URL; + }); + + it('server closes connection if version is older than cutoff', async () => { + serverMetadata = await waitForBrokerServerConnection(bc); + + expect(brokerToken).toEqual('broker-token-12345'); + expect(serverMetadata).toMatchObject({ + capabilities: ['receive-post-streams'], + }); + await delay(100); + // expect(isWebsocketConnOpen(bs.server[0])).toBeFalsy() + const response = await axiosClient.get( + `http://localhost:${bc.port}/healthcheck`, + // { some: { example: 'json' } }, + ); + expect(response.status).toEqual(500); + expect(response.data).toStrictEqual([ + { + ok: false, + websocketConnectionOpen: false, + brokerServerUrl: 'http://localhost:9999/?connection_role=primary', + version: '4.90.0', + transport: expect.any(String), + }, + { + ok: false, + websocketConnectionOpen: false, + brokerServerUrl: 'http://localhost:9999/?connection_role=secondary', + version: '4.90.0', + transport: expect.any(String), + }, + ]); + }); +});