diff --git a/package.json b/package.json index 79c225d..05ecb14 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@rocket.chat/sdk", - "version": "1.0.0-alpha.30", + "version": "1.0.0-dj.0", "description": "Node.js SDK for Rocket.Chat. Application interface for server methods and message streams.", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index 88950db..677d145 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -187,7 +187,7 @@ export interface ISubscription { id?: string name?: any unsubscribe: () => Promise - onEvent: (callback: ISocketMessageCallback) => void + onEvent?: (callback: ISocketMessageCallback) => void [key: string]: any } diff --git a/src/lib/api/api.ts b/src/lib/api/api.ts index b195d2b..17c2c2b 100644 --- a/src/lib/api/api.ts +++ b/src/lib/api/api.ts @@ -11,6 +11,7 @@ import { import { Message } from '../message' import { EventEmitter } from 'tiny-events' +import * as settings from '../settings'; /** Check for existing login */ // export function loggedIn () { @@ -99,6 +100,7 @@ class Client implements IClient { get headers (): any { return { 'Content-Type': 'application/json', + ...settings.customHeaders, ...this._headers } } diff --git a/src/lib/drivers/ddp.ts b/src/lib/drivers/ddp.ts index b5bcab9..247ce8b 100644 --- a/src/lib/drivers/ddp.ts +++ b/src/lib/drivers/ddp.ts @@ -8,6 +8,7 @@ import { EventEmitter } from 'tiny-events' import { logger as Logger } from '../log' import { ISocket, IDriver } from './index' +import * as settings from '../settings'; EventEmitter.prototype.removeAllListeners = function (event?: string | any): any { if (event) { @@ -48,7 +49,7 @@ export class Socket extends EventEmitter { handlers: ISocketMessageHandler[] = [] config: ISocketOptions | any openTimeout?: NodeJS.Timer | number - reopenInterval?: NodeJS.Timer + reopenInterval?: NodeJS.Timer | number pingTimeout?: NodeJS.Timer | number connection?: WebSocket session?: string @@ -86,14 +87,14 @@ export class Socket extends EventEmitter { open = (ms: number = this.config.reopen) => { return new Promise(async (resolve, reject) => { let connection: WebSocket - this.lastPing = Date.now() - await this.close() - if (this.reopenInterval) clearInterval(this.reopenInterval) + + this.reopenInterval && clearInterval(this.reopenInterval as any) this.reopenInterval = setInterval(() => { return !this.alive() && this.reopen() }, ms) + try { - connection = new WebSocket(this.host) + connection = new WebSocket(this.host, null, { headers: settings.customHeaders }) connection.onerror = reject } catch (err) { this.logger.error(err) @@ -108,6 +109,8 @@ export class Socket extends EventEmitter { /** Send handshake message to confirm connection, start pinging. */ onOpen = async (callback: Function) => { + this.lastPing = Date.now() + const connected = await this.send({ msg: 'connect', version: '1', @@ -123,20 +126,14 @@ export class Socket extends EventEmitter { /** Emit close event so it can be used for promise resolve in close() */ onClose = (e: any) => { try { - this.emit('close', e) - if (e.code !== 1000) { - return this.reopen() - } else { - if (this.reopenInterval) clearInterval(this.reopenInterval) - this.openTimeout && clearTimeout(this.openTimeout as any) - this.pingTimeout && clearTimeout(this.pingTimeout as any) - delete this.connection + if (e?.reason !== 'disconnect') { + this.reopen() } - this.logger.info(`[ddp] Close (${e.code}) ${e.reason}`) - + this.logger.info(`[ddp] Close (${e?.code}) ${e?.reason}`) } catch (error) { this.logger.error(error) } + this.emit('close', e) } /** @@ -149,38 +146,46 @@ export class Socket extends EventEmitter { this.lastPing = Date.now() void this.ping() const data = (e.data) ? JSON.parse(e.data) : undefined + this.logger.debug(data) // 👈 very useful for debugging missing responses if (!data) return this.logger.error(`[ddp] JSON parse error: ${e.message}`) this.logger.debug(`[ddp] messages received: ${e.data}`) if (data.collection) this.emit(data.collection, data) if (data.msg) this.emit(data.msg, data) + if (data.id) this.emit(data.id, data) } /** Disconnect the DDP from server and clear all subscriptions. */ close = async () => { + this.unsubscribeAll().catch(e => this.logger.debug(e)) + + this.reopenInterval && clearInterval(this.reopenInterval as any) + this.openTimeout && clearTimeout(this.openTimeout as any) + this.pingTimeout && clearTimeout(this.pingTimeout as any) + if (this.connected) { - this.unsubscribeAll().catch(e => this.logger.debug(e)) await new Promise((resolve) => { if (this.connection) { this.once('close', resolve) this.connection.close(1000, 'disconnect') - return } }) .catch(this.logger.error) } + return Promise.resolve() } /** Clear connection and try to connect again. */ reopen = async () => { if (this.openTimeout) return - await this.close() - this.openTimeout = setTimeout(async () => { - delete this.openTimeout - await this.open() - .catch((err) => this.logger.error(`[ddp] Reopen error: ${err.message}`)) - }, this.config.reopen) + this.openTimeout = setTimeout(() => { delete this.openTimeout }, this.config.reopen); + + await this.open() + .catch((err) => { + this.logger.error(`[ddp] Reopen error: ${err.message}`); + this.reopen(); + }) } /** Check if websocket connected and ready. */ @@ -209,14 +214,26 @@ export class Socket extends EventEmitter { * @param errorMsg An alternate `data.msg` value indicating an error response */ send = async (obj: any): Promise => { - return new Promise((resolve, reject) => { + return new Promise(async(resolve, reject) => { if (!this.connection) throw new Error('[ddp] sending without open connection') + if (!this.connected) await new Promise(resolve => this.on('open', resolve)) + const id = obj.id || `ddp-${ this.sent }` this.sent += 1 const data = { ...obj, ...(/connect|ping|pong/.test(obj.msg) ? {} : { id }) } const stringdata = JSON.stringify(data) this.logger.debug(`[ddp] sending message: ${stringdata}`) - this.connection.send(stringdata) + + if (/^sub$/.test(obj.msg)) { + const { name, params } = obj; + this.subscriptions[id] = { id, name, params, unsubscribe: this.unsubscribe.bind(this, id) }; + } + + try { + this.connection.send(stringdata) + } catch { + this.logger.error('[ddp] send without open connection'); + } this.once('disconnected', reject) const listener = (data.msg === 'ping' && 'pong') || (data.msg === 'connect' && 'connected') || data.id @@ -235,12 +252,11 @@ export class Socket extends EventEmitter { this.pingTimeout && clearTimeout(this.pingTimeout as any) this.pingTimeout = setTimeout(() => { this.send({ msg: 'ping' }) - .then(() => { - return this.ping() - }) + .then(() => this.ping()) .catch(() => this.reopen()) }, this.config.ping) } + /** Check if ping-pong to server is within tolerance of 1 missed ping */ alive = () => { if (!this.lastPing) return false @@ -324,29 +340,31 @@ export class Socket extends EventEmitter { * @param name Stream name to subscribe to * @param params Params sent to the subscription request */ - subscribe = (name: string, params: any[], callback ?: ISocketMessageCallback) => { + subscribe = (name: string, params: any[], callback ?: ISocketMessageCallback, id?: string) => { this.logger.info(`[ddp] Subscribe to ${name}, param: ${JSON.stringify(params)}`) - return this.send({ msg: 'sub', name, params }) + return this.send({ msg: 'sub', id, name, params }) .then((result) => { const id = (result.subs) ? result.subs[0] : undefined - const unsubscribe = this.unsubscribe.bind(this, id) - const onEvent = this.onEvent.bind(this, name) - const subscription = { id, name, params, unsubscribe, onEvent } - if (callback) subscription.onEvent(callback) - this.subscriptions[id] = subscription - return subscription + if (id) { + const unsubscribe = this.unsubscribe.bind(this, id) + const onEvent = this.onEvent.bind(this, name) + const subscription = { id, name, params, unsubscribe, onEvent } + if (callback) subscription.onEvent(callback) + this.subscriptions[id] = subscription + return subscription + } }) .catch((err) => { this.logger.error(`[ddp] Subscribe error: ${err.message}`) - throw err + // throw err }) } /** Subscribe to all pre-configured streams (e.g. on login resume) */ subscribeAll = () => { const subscriptions = Object.keys(this.subscriptions || {}).map((key) => { - const { name, params } = this.subscriptions[key] - return this.subscribe(name, params) + const { name, params, id } = this.subscriptions[key] + return this.subscribe(name, params, undefined, id) }) return Promise.all(subscriptions) } @@ -458,7 +476,6 @@ export class DDPDriver extends EventEmitter implements ISocket, IDriver { this.logger.info(`[driver] Timeout (${config.timeout})`) const err = new Error('Socket connection timeout') cancelled = true - this.ddp.removeAllListeners('connected') reject(err) }, config.timeout) diff --git a/src/lib/settings.ts b/src/lib/settings.ts index a0461e8..d23b011 100644 --- a/src/lib/settings.ts +++ b/src/lib/settings.ts @@ -33,3 +33,6 @@ export let dmCacheMaxAge = 1000 * parseInt(process.env.DM_ROOM_CACHE_MAX_AGE || export let token = process.env.LIVECHAT_TOKEN || '' export let rid = process.env.LIVECHAT_ROOM || '' export let department = process.env.LIVECHAT_DEPARTMENT || '' + +// Headers settings +export let customHeaders = {};