Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: socketdock implementation #26

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ POSTGRES_ADMIN_USER=
POSTGRES_ADMIN_PASSWORD=

USE_PUSH_NOTIFICATIONS='true'
NOTIFICATION_WEBHOOK_URL=
NOTIFICATION_WEBHOOK_URL=
USE_SOCKETDOCK='false'
3 changes: 0 additions & 3 deletions .vscode/settings.json

This file was deleted.

1 change: 1 addition & 0 deletions dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ void connect({
const url = app.url()

process.env.NODE_ENV = 'development'
process.env.USE_SOCKETDOCK = 'false'
process.env.AGENT_PORT = `${port}`
process.env.AGENT_ENDPOINTS = `${url},${url?.replace('http', 'ws')}`
process.env.SHORTENER_BASE_URL = `${url}/s`
Expand Down
27 changes: 22 additions & 5 deletions src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,18 @@ import type { Socket } from 'net'

import express from 'express'
import { Server } from 'ws'
import { SocketDockInboundTransport } from './transport/SocketDockInboundTransport'

import { AGENT_ENDPOINTS, AGENT_NAME, AGENT_PORT, LOG_LEVEL, POSTGRES_HOST, WALLET_KEY, WALLET_NAME } from './constants'
import {
AGENT_ENDPOINTS,
AGENT_NAME,
AGENT_PORT,
LOG_LEVEL,
POSTGRES_HOST,
USE_SOCKETDOCK,
WALLET_KEY,
WALLET_NAME,
} from './constants'
import { askarPostgresConfig } from './database'
import { Logger } from './logger'
import { StorageMessageQueueModule } from './storage/StorageMessageQueueModule'
Expand Down Expand Up @@ -92,14 +102,19 @@ export async function createAgent() {
// Create all transports
const httpInboundTransport = new HttpInboundTransport({ app, port: AGENT_PORT })
const httpOutboundTransport = new HttpOutboundTransport()
const wsInboundTransport = new WsInboundTransport({ server: socketServer })
const wsOutboundTransport = new WsOutboundTransport()

// Register all Transports
agent.registerInboundTransport(httpInboundTransport)
agent.registerOutboundTransport(httpOutboundTransport)
agent.registerInboundTransport(wsInboundTransport)
agent.registerOutboundTransport(wsOutboundTransport)

if (USE_SOCKETDOCK === 'false') {
pallavighule marked this conversation as resolved.
Show resolved Hide resolved
const wsInboundTransport = new WsInboundTransport({ server: socketServer })
const wsOutboundTransport = new WsOutboundTransport()
agent.registerInboundTransport(wsInboundTransport)
agent.registerOutboundTransport(wsOutboundTransport)
} else {
agent.registerInboundTransport(new SocketDockInboundTransport(app, logger, agent))
}

// Added health check endpoint
httpInboundTransport.app.get('/health', async (_req, res) => {
Expand All @@ -124,6 +139,8 @@ export async function createAgent() {
return res.send(outOfBandRecord.outOfBandInvitation.toJSON())
})

httpInboundTransport.app.use(express.json())

await agent.initialize()

// When an 'upgrade' to WS is made on our http server, we forward the
Expand Down
2 changes: 2 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ export const POSTGRES_ADMIN_PASSWORD = process.env.POSTGRES_ADMIN_PASSWORD

export const INVITATION_URL = process.env.INVITATION_URL

export const USE_SOCKETDOCK = process.env.USE_SOCKETDOCK || 'false'

export const LOG_LEVEL = LogLevel.debug

export const IS_DEV = process.env.NODE_ENV === 'development'
Expand Down
73 changes: 73 additions & 0 deletions src/transport/SocketDockInboundTransport.ts
pallavighule marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { Express } from 'express'
import { Logger } from '../logger'
import { Agent, InboundTransport } from '@credo-ts/core'
import { WebSocketTransportSession } from './SocketDockTransportSession'

export class SocketDockInboundTransport implements InboundTransport {
private app: Express
private logger: Logger
private agent: Agent
private active_connections: Record<string, unknown> = {}
pallavighule marked this conversation as resolved.
Show resolved Hide resolved

constructor(app: Express, logger: Logger, agent: Agent) {
this.app = app
this.logger = logger
this.agent = agent
}
pallavighule marked this conversation as resolved.
Show resolved Hide resolved

async start(agent: Agent<any>): Promise<void> {
pallavighule marked this conversation as resolved.
Show resolved Hide resolved
this.app.post('/connect', async (req, res) => {
this.logger.info('SocketDockInboundTransport.connect')
const sendUrl = req.body.meta.send
const connectionId = req.body.meta.connection_id

const socketId = this.active_connections[connectionId] as string
if (!socketId) {
this.active_connections[socketId] = socketId
this.logger.debug(`Saving new socketId : ${connectionId}`)
}

try {
res.status(200).send(`connection with socketId : ${connectionId} added successfully`)
} catch (error) {
res.status(500).send('Error sending response to send URL')
}
})

this.app.post('/message', async (req, res) => {
this.logger.info('SocketDockInboundTransport.message')

const connectionId = req.body.meta.connection_id

try {
const socketId = this.active_connections[connectionId] as string
const sendUrl = req.body.meta.send
const requestMimeType = req.headers['content-type']
const session = new WebSocketTransportSession(socketId, res, sendUrl, requestMimeType)
const message = req.body.message
const encryptedMessage = JSON.parse(message)
await agent.receiveMessage(encryptedMessage, session)
if (!res.headersSent) {
res.status(200).end()
}
} catch (error) {
if (!res.headersSent) {
res.status(500).send('Error processing message')
}
}
})

this.app.post('/disconnect', async (req, res) => {
this.logger.info('SocketDockInboundTransport.disconnect')
const { connection_id } = req.body

delete this.active_connections[connection_id]
this.logger.debug(`removed connection with socketId : ${connection_id}`)
res.status(200).send(`connection with socketId : ${connection_id} removed successfully`)
})
}

stop(): Promise<void> {
throw new Error('Method not implemented.')
}
}
50 changes: 50 additions & 0 deletions src/transport/SocketDockTransportSession.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { AgentContext, DidCommMimeType, EncryptedMessage, TransportSession } from '@credo-ts/core'
import type { Response } from 'express'
import { CredoError } from '@credo-ts/core'

import { agentDependencies } from '@credo-ts/node'

const supportedContentTypes: string[] = [DidCommMimeType.V0, DidCommMimeType.V1]

export class WebSocketTransportSession implements TransportSession {
public id: string
public readonly type = 'socketdock'
pallavighule marked this conversation as resolved.
Show resolved Hide resolved
public res: Response
public sendUrl: any
public requestMimeType: any

public constructor(id: string, res: Response, sendUrl: any, requestMimeType: any) {
this.id = id
this.res = res
this.sendUrl = sendUrl
this.requestMimeType = requestMimeType
}

public async close(): Promise<void> {
if (!this.res.headersSent) {
this.res.status(200).end()
}
}

public async send(agentContext: AgentContext, encryptedMessage: EncryptedMessage): Promise<void> {
if (this.res.headersSent) {
throw new CredoError(`${this.type} transport session has been closed.`)
}

// By default we take the agent config's default DIDComm content-type
let responseMimeType = agentContext.config.didCommMimeType as string

if (this.requestMimeType && supportedContentTypes.includes(this.requestMimeType)) {
responseMimeType = this.requestMimeType
}

const requestOptions = {
method: 'POST',
headers: {
'Content-Type': responseMimeType,
},
body: JSON.stringify(encryptedMessage),
}
await agentDependencies.fetch(this.sendUrl, requestOptions)
}
}