From e550dc9c2fb34538eb72bfedf5a0b8bac1bdc8e5 Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Tue, 18 Jul 2023 11:07:44 +0300 Subject: [PATCH] websockets support --- README.md | 29 ++++++++++++++- index.d.ts | 6 ++++ lib/connection.js | 81 ++++++++++++++++++++++++++++++----------- lib/websocket.js | 91 +++++++++++++++++++++++++++++++++++++++++++++++ package-lock.json | 6 ++++ package.json | 3 +- 6 files changed, 194 insertions(+), 22 deletions(-) create mode 100644 lib/websocket.js diff --git a/README.md b/README.md index 5db9a4713..57c57d981 100644 --- a/README.md +++ b/README.md @@ -242,6 +242,32 @@ client.on('log', (level, loggerName, message, furtherInfo) => { The `level` being passed to the listener can be `verbose`, `info`, `warning` or `error`. Visit the [logging documentation][doc-logging] for more information. +## WebSockets + +You can use websocket as transport. But Cassandra doesn't support this protocol +so some proxy should be deployed in front of Cassandra, which can handle this transport protocol. + +```javascript + const client = new cassandra.Client({ + transport: 'WebSocket', + contactPoints: [ + // some proxies that support websocket transport + '127.0.0.1:9043', + 'localhost:9044' + ], + webSocketOptions: { + // some client websocket options + protocolVersion: 13, + ... + } +}); +``` + +You can configure your websocket client with `webSocketOptions`. +To properly configure it follow [websocket/ws doc][ws-doc]. + +You also can use websockets over SSL by passing `transport: 'SecureWebSocket'`. + ## Compatibility - Apache Cassandra versions 2.1 and above. @@ -291,4 +317,5 @@ Unless required by applicable law or agreed to in writing, software distributed [streams2]: https://nodejs.org/api/stream.html#stream_class_stream_readable [cql-udt]: https://cassandra.apache.org/doc/latest/cql/types.html#udts [dse]: https://www.datastax.com/products/datastax-enterprise -[astra]: https://www.datastax.com/products/datastax-astra \ No newline at end of file +[astra]: https://www.datastax.com/products/datastax-astra +[ws-doc]: https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketaddress-protocols-options \ No newline at end of file diff --git a/index.d.ts b/index.d.ts index cf44d76e9..994f246bc 100644 --- a/index.d.ts +++ b/index.d.ts @@ -24,6 +24,7 @@ import { metrics } from './lib/metrics'; import { tracker } from './lib/tracker'; import { metadata } from './lib/metadata'; import { datastax } from './lib/datastax/'; +import { ClientRequestArgs } from 'http'; import Long = types.Long; import Uuid = types.Uuid; import graph = datastax.graph; @@ -191,7 +192,11 @@ export interface ExecutionOptions { setHints(hints: string[]): void; } +export type WebSocketClientOptions = (ClientOptions | ClientRequestArgs) + & {protocols?: string | string[] | undefined}; + export interface ClientOptions { + transport?: 'SecureWebSocket' | 'WebSocket' | undefined contactPoints?: string[]; localDataCenter?: string; keyspace?: string; @@ -253,6 +258,7 @@ export interface ClientOptions { tcpNoDelay?: boolean; }; sslOptions?: tls.ConnectionOptions; + webSocketOptions?: WebSocketClientOptions; } export interface QueryOptions { diff --git a/lib/connection.js b/lib/connection.js index 843828729..de52feb2b 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -32,6 +32,7 @@ const StreamIdStack = require('./stream-id-stack'); const OperationState = require('./operation-state'); const promiseUtils = require('./promise-utils'); const { ExecutionOptions } = require('./execution-options'); +const { WebSocketWrapper } = require('./websocket'); /** * Represents a connection to a Cassandra node @@ -171,30 +172,70 @@ class Connection extends events.EventEmitter { const self = this; this.log('info', `Connecting to ${this.endpointFriendlyName}`); - if (!this.options.sslOptions) { - this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold }); - this.netClient.connect(this.port, this.address, function connectCallback() { - self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`); - self.bindSocketListeners(); - self.startup(callback); - }); - } - else { - // Use TLS - const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions); + if (this.options.transport) { + if (this.options.transport.toLowerCase() === 'securewebsocket') { + // Use secure WebSocket + const options = utils.extend({ rejectUnauthorized: false, transport: this.options.transport }, + this.options.webSocketOptions); + + if (!options.protocols) { + options.protocols = ['cql']; + } + + this.netClient = new WebSocketWrapper(options); + + this.netClient.connect(this.port, this.address, function connectCallback() { + self.log('verbose', `Secure WebSocket to ${self.endpointFriendlyName}`); + self.bindSocketListeners(); + self.startup(callback); + }); + } else { + // Use WebSocket + const options = utils.extend({ + transport: this.options.transport, + highWaterMark: this.options.socketOptions.coalescingThreshold, + handshakeTimeout: this.options.socketOptions.connectTimeout, + }, this.options.webSocketOptions); + + if (!options.protocols) { + options.protocols = ['cql']; + } - if (this.options.sni) { - sslOptions.servername = this._serverName; + this.netClient = new WebSocketWrapper(options); + + this.netClient.connect(this.port, this.address, function connectCallback() { + self.log('verbose', `WebSocket connected to ${self.endpointFriendlyName}`); + self.bindSocketListeners(); + self.startup(callback); + }); } + } else { + // Use Socket + if (!this.options.sslOptions) { + this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold }); + + this.netClient.connect(this.port, this.address, function connectCallback() { + self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`); + self.bindSocketListeners(); + self.startup(callback); + }); + } else { + // Use Socket with TLS + const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions); - this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() { - self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`); - self.bindSocketListeners(); - self.startup(callback); - }); + if (this.options.sni) { + sslOptions.servername = this._serverName; + } - // TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version) - this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold); + this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() { + self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`); + self.bindSocketListeners(); + self.startup(callback); + }); + + // TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version) + this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold); + } } this.netClient.once('error', function socketError(err) { diff --git a/lib/websocket.js b/lib/websocket.js new file mode 100644 index 000000000..8102121ba --- /dev/null +++ b/lib/websocket.js @@ -0,0 +1,91 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const { EventEmitter } = require('events'); +const { WebSocket } = require('ws'); + +/** + * WebSocketWrapper is a wrapper on the `ws.Websocket` which implements + * `net.Socket` interface to be used by the `cassandra.Connection` + */ +class WebSocketWrapper extends EventEmitter { + /** + * Creates a websocket wrapper instance. To connect use `connect` method + * @param {object} options client options for a websocket + */ + constructor(options) { + super(); + this.options = options; + } + + /** + * Creates an instance of a websocket and connects + * @param {String} port + * @param {String} address + * @param {() => void} connectionCallback is called when connection is successfully established + * @returns {WebSocketWrapper} wrapper itself + */ + connect(port, address, connectionCallback) { + const schema = this.options.transport.toLowerCase() === 'securewebsocket' ? 'wss' : 'ws'; + + this.ws = new WebSocket(schema+'://'+address+':'+port, this.options.protocols, this.options); + + if (connectionCallback) { + this.ws.on('open', connectionCallback); + } + + const stream = WebSocket.createWebSocketStream(this.ws, this.options); + + stream.on('error', err => { + this.emit('error', err); + }); + stream.on('drain', () => { + this.emit('drain'); + }); + stream.on('close', () => { + this.emit('close'); + }); + stream.on('end', () => { + this.emit('end'); + }); + + this.write = stream.write.bind(stream); + this.pipe = stream.pipe.bind(stream); + this.end = stream.end.bind(stream); + this.destroy = stream.destroy.bind(stream); + + return this; + } + + /** + * It is not implemented because `ws` lib doesn't provide API to work with + */ + setTimeout() {} + + /** + * It is not implemented because `ws` lib doesn't provide API to work with + */ + setKeepAlive() {} + + /** + * It is not implemented because `ws` lib doesn't provide API to work with + */ + setNoDelay() {} +} + +module.exports.WebSocketWrapper = WebSocketWrapper; diff --git a/package-lock.json b/package-lock.json index 250f608b1..c2010cd4c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1105,6 +1105,12 @@ "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", "dev": true }, + "ws": { + "version": "8.13.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.13.0.tgz", + "integrity": "sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==", + "requires": {} + }, "y18n": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.0.tgz", diff --git a/package.json b/package.json index 2bf06d258..ecf6475ea 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,8 @@ "@types/long": "^4.0.0", "@types/node": ">=8", "adm-zip": "^0.5.3", - "long": "^2.2.0" + "long": "^2.2.0", + "ws": "^8.13.0" }, "devDependencies": { "chai": "4.2.0",