Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: socketdock implementation #26

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ POSTGRES_ADMIN_USER=
POSTGRES_ADMIN_PASSWORD=

USE_PUSH_NOTIFICATIONS='true'
NOTIFICATION_WEBHOOK_URL=
NOTIFICATION_WEBHOOK_URL=
USE_SOCKETDOCK='true'
3 changes: 0 additions & 3 deletions .vscode/settings.json

This file was deleted.

31 changes: 26 additions & 5 deletions src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,25 @@ import type { Socket } from 'net'

import express from 'express'
import { Server } from 'ws'
import { registerSocketDockRoutes } from './transport/SocketDockInboundTransport'

import { AGENT_ENDPOINTS, AGENT_NAME, AGENT_PORT, LOG_LEVEL, POSTGRES_HOST, WALLET_KEY, WALLET_NAME } from './constants'
import {
AGENT_ENDPOINTS,
AGENT_NAME,
AGENT_PORT,
LOG_LEVEL,
POSTGRES_HOST,
USE_SOCKETDOCK,
WALLET_KEY,
WALLET_NAME,
} from './constants'
import { askarPostgresConfig } from './database'
import { Logger } from './logger'
import { StorageMessageQueueModule } from './storage/StorageMessageQueueModule'
import { PushNotificationsFcmModule } from './push-notifications/fcm'

import { SocketIdsManager } from './transport/SocketIdManager'

function createModules() {
const modules = {
storageModule: new StorageMessageQueueModule(),
Expand Down Expand Up @@ -92,14 +104,17 @@ export async function createAgent() {
// Create all transports
const httpInboundTransport = new HttpInboundTransport({ app, port: AGENT_PORT })
const httpOutboundTransport = new HttpOutboundTransport()
const wsInboundTransport = new WsInboundTransport({ server: socketServer })
const wsOutboundTransport = new WsOutboundTransport()

// Register all Transports
agent.registerInboundTransport(httpInboundTransport)
agent.registerOutboundTransport(httpOutboundTransport)
agent.registerInboundTransport(wsInboundTransport)
agent.registerOutboundTransport(wsOutboundTransport)

if (USE_SOCKETDOCK === 'false') {
pallavighule marked this conversation as resolved.
Show resolved Hide resolved
const wsInboundTransport = new WsInboundTransport({ server: socketServer })
const wsOutboundTransport = new WsOutboundTransport()
agent.registerInboundTransport(wsInboundTransport)
agent.registerOutboundTransport(wsOutboundTransport)
}

// Added health check endpoint
httpInboundTransport.app.get('/health', async (_req, res) => {
Expand All @@ -124,7 +139,13 @@ export async function createAgent() {
return res.send(outOfBandRecord.outOfBandInvitation.toJSON())
})

httpInboundTransport.app.use(express.json())

await agent.initialize()
if (USE_SOCKETDOCK === 'true') {
const socketIdManager = SocketIdsManager.getInstance()
await registerSocketDockRoutes(app, logger, socketIdManager, agent)
}

// When an 'upgrade' to WS is made on our http server, we forward the
// request to the WS server
Expand Down
4 changes: 3 additions & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export const WALLET_NAME = process.env.WALLET_NAME || 'credebl-mediator-dev'
export const WALLET_KEY = process.env.WALLET_KEY || 'credebl-mediator-dev'
export const AGENT_ENDPOINTS = process.env.AGENT_ENDPOINTS?.split(',') ?? [
`http://localhost:${AGENT_PORT}`,
`ws://localhost:${AGENT_PORT}`,
`ws://localhost:8765/ws`,
]

export const POSTGRES_HOST = process.env.POSTGRES_HOST
Expand All @@ -19,6 +19,8 @@ export const POSTGRES_ADMIN_PASSWORD = process.env.POSTGRES_ADMIN_PASSWORD

export const INVITATION_URL = process.env.INVITATION_URL

export const USE_SOCKETDOCK = process.env.USE_SOCKETDOCK || 'true'
pallavighule marked this conversation as resolved.
Show resolved Hide resolved

export const LOG_LEVEL = LogLevel.debug

export const IS_DEV = process.env.NODE_ENV === 'development'
Expand Down
23 changes: 23 additions & 0 deletions src/transport/ProcessInboundMessage.ts
pallavighule marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Agent } from '@credo-ts/core'

import type { Request, Response } from 'express'
import { WebSocketTransportSession } from './SocketDockTransportSession'

export async function processInboundMessage(req: Request, res: Response, agent: Agent, socketId: string) {
const sendUrl = req.body.meta.send
const requestMimeType = req.headers['content-type']
const session = new WebSocketTransportSession(socketId, res, sendUrl, requestMimeType)

try {
const message = req.body.message
const encryptedMessage = JSON.parse(message)
await agent.receiveMessage(encryptedMessage, session)
if (!res.headersSent) {
res.status(200).end()
}
} catch (error) {
if (!res.headersSent) {
res.status(500).send('Error processing message')
}
}
}
56 changes: 56 additions & 0 deletions src/transport/SocketDockInboundTransport.ts
pallavighule marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Express } from 'express'
import { SocketIdsManager } from './SocketIdManager'
import { processInboundMessage } from './ProcessInboundMessage'
import { Logger } from '../logger'
import { Agent } from '@credo-ts/core'

export function registerSocketDockRoutes(
app: Express,
logger: Logger,
socketIdManager: SocketIdsManager,
agent: Agent
) {
app.post('/connect', async (req, res) => {
logger.info('httpInboundTransport.connect')
const sendUrl = req.body.meta.send
const connectionId = req.body.meta.connection_id

const socketId = socketIdManager.getConnectionBySocketId(connectionId)
if (!socketId) {
socketIdManager.addSocketId(connectionId)
logger.debug(`Saving new socketId : ${connectionId}`)
}

if (!sendUrl) {
logger.error('Missing "send" URL in request body')
return res.status(400).send('Missing "send" URL')
}

try {
res.status(200).send(`connection with socketId : ${connectionId} added successfully`)
} catch (error) {
res.status(500).send('Error sending response to send URL')
}
})

app.post('/message', async (req, res) => {
logger.info('httpInboundTransport.message')

const connectionId = req.body.meta.connection_id

try {
const socketId = socketIdManager.getConnectionBySocketId(connectionId)
await processInboundMessage(req, res, agent, socketId)
} catch (error) {
res.status(500).send('Error sending response to send URL')
}
})

app.post('/disconnect', async (req, res) => {
logger.info('httpInboundTransport.disconnect')
const { connection_id } = req.body
socketIdManager.removeSocketId(connection_id)
logger.debug(`removed connection with socketId : ${connection_id}`)
res.status(200).send(`connection with socketId : ${connection_id} removed successfully`)
})
}
50 changes: 50 additions & 0 deletions src/transport/SocketDockTransportSession.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { AgentContext, DidCommMimeType, EncryptedMessage, TransportSession } from '@credo-ts/core'
import type { Response } from 'express'
import { CredoError } from '@credo-ts/core'

import { agentDependencies } from '@credo-ts/node'

const supportedContentTypes: string[] = [DidCommMimeType.V0, DidCommMimeType.V1]

export class WebSocketTransportSession implements TransportSession {
public id: string
public readonly type = 'socketdock'
pallavighule marked this conversation as resolved.
Show resolved Hide resolved
public res: Response
public sendUrl: any
public requestMimeType: any

public constructor(id: string, res: Response, sendUrl: any, requestMimeType: any) {
this.id = id
this.res = res
this.sendUrl = sendUrl
this.requestMimeType = requestMimeType
}

public async close(): Promise<void> {
if (!this.res.headersSent) {
this.res.status(200).end()
}
}

public async send(agentContext: AgentContext, encryptedMessage: EncryptedMessage): Promise<void> {
if (this.res.headersSent) {
throw new CredoError(`${this.type} transport session has been closed.`)
}

// By default we take the agent config's default DIDComm content-type
let responseMimeType = agentContext.config.didCommMimeType as string

if (this.requestMimeType && supportedContentTypes.includes(this.requestMimeType)) {
responseMimeType = this.requestMimeType
}

const requestOptions = {
method: 'POST',
headers: {
'Content-Type': responseMimeType,
},
body: JSON.stringify(encryptedMessage),
}
await agentDependencies.fetch(this.sendUrl, requestOptions)
}
}
30 changes: 30 additions & 0 deletions src/transport/SocketIdManager.ts
pallavighule marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
class SocketIdsManager {
private active_connections: Record<string, unknown> = {}

private static instance: SocketIdsManager

private constructor() {}

public static getInstance(): SocketIdsManager {
if (!SocketIdsManager.instance) {
SocketIdsManager.instance = new SocketIdsManager()
}
return SocketIdsManager.instance
}

public addSocketId(socketId: string): string {
this.active_connections[socketId] = socketId
return socketId
}

public removeSocketId(socketId: string): string {
delete this.active_connections[socketId]
return socketId
}

public getConnectionBySocketId(socketId: string): any {
return this.active_connections[socketId]
}
}

export { SocketIdsManager }