From f94e709a0a8d8e7a74bd7471dccf61f5007981d0 Mon Sep 17 00:00:00 2001 From: Ralf Aron Date: Sun, 17 Nov 2024 18:39:24 +0100 Subject: [PATCH] fix: update reset --- projects/aas-core/src/lib/convert.ts | 50 +++- projects/aas-core/src/test/convert.spec.ts | 15 ++ .../aas-lib/src/lib/index-change.service.ts | 1 + .../aas-server/src/app/aas-index/aas-index.ts | 6 +- .../src/app/aas-index/lowdb/lowdb-index.ts | 229 ++++++++---------- .../src/app/aas-index/mysql/mysql-index.ts | 189 ++++++--------- .../src/app/aas-provider/aas-provider.ts | 63 +++-- .../src/app/aas-provider/aas-server-scan.ts | 12 +- .../src/app/aas-provider/task-handler.ts | 38 ++- .../aas-server/src/app/aas-scan-worker.ts | 7 +- projects/aas-server/src/app/worker-app.ts | 2 +- projects/aas-server/src/app/ws-server.ts | 8 + 12 files changed, 343 insertions(+), 277 deletions(-) diff --git a/projects/aas-core/src/lib/convert.ts b/projects/aas-core/src/lib/convert.ts index 15161480..105f3336 100644 --- a/projects/aas-core/src/lib/convert.ts +++ b/projects/aas-core/src/lib/convert.ts @@ -384,7 +384,7 @@ export function parseDate(s: string, localeId?: string): Date | undefined { getSeconds(timeTuple?.items), ); } else { - date = new Date(NaN); + date = new Date(0); } } else { date = new Date(s); @@ -496,6 +496,54 @@ export function parseDate(s: string, localeId?: string): Date | undefined { } } +/** + * Indicates whether the specified date is valid. + * @param value The date value. + * @returns `true` if the date value is valid; otherwise, `false`. + */ +export function isValidDate(value: Date | undefined): boolean { + if (value === undefined) { + return false; + } + + const year = value.getFullYear(); + if (year < 1970 || year > 3000) { + return false; + } + + const month = value.getMonth(); + if (month < 0 || month > 11) { + return false; + } + + const day = value.getDay(); + if (day < 1 || day > 31) { + return false; + } + + const hours = value.getHours(); + if (hours < 0 || hours > 23) { + return false; + } + + const minutes = value.getMinutes(); + if (minutes < 0 || minutes > 59) { + return false; + } + + const seconds = value.getSeconds(); + if (seconds < 0 || seconds > 59) { + return false; + } + + const ms = value.getMilliseconds(); + if (ms < 0 || ms > 999) { + return false; + } + + return true; +} + /** * Determines the data type from the specified string expression. * @param value The value or a string expression. diff --git a/projects/aas-core/src/test/convert.spec.ts b/projects/aas-core/src/test/convert.spec.ts index 785b81b4..9e8775a5 100644 --- a/projects/aas-core/src/test/convert.spec.ts +++ b/projects/aas-core/src/test/convert.spec.ts @@ -23,6 +23,7 @@ import { toBoolean, mimeTypeToExtension, extensionToMimeType, + isValidDate, } from '../lib/convert.js'; describe('Convert', () => { @@ -334,6 +335,20 @@ describe('Convert', () => { const date = new Date(2023, 1, 27, 13, 14, 15).toString(); expect(parseDate(date)).toEqual(new Date(2023, 1, 27, 13, 14, 15)); }); + + it('converts "2023-03-22"', () => { + expect(parseDate('2023-03-22')).toEqual(new Date('2023-03-22')); + }); + }); + + describe('isValidDate', () => { + it('indicates that IECĀ 61984 is invalid', () => { + expect(isValidDate(new Date('IECĀ 61984'))).toEqual(false); + }); + + it('indicates that 2023-03-22 is valid', () => { + expect(isValidDate(new Date('2023-03-22'))).toEqual(true); + }); }); describe('toLocale', () => { diff --git a/projects/aas-lib/src/lib/index-change.service.ts b/projects/aas-lib/src/lib/index-change.service.ts index 8504d565..8bbee09d 100644 --- a/projects/aas-lib/src/lib/index-change.service.ts +++ b/projects/aas-lib/src/lib/index-change.service.ts @@ -114,6 +114,7 @@ export class IndexChangeService { break; case 'Reset': this.reset.emit(); + this.state.set({ changedDocuments: 0, documentCount: 0, endpointCount: 0 }); break; } } diff --git a/projects/aas-server/src/app/aas-index/aas-index.ts b/projects/aas-server/src/app/aas-index/aas-index.ts index c52f966e..eb11e60e 100644 --- a/projects/aas-server/src/app/aas-index/aas-index.ts +++ b/projects/aas-server/src/app/aas-index/aas-index.ts @@ -15,6 +15,7 @@ import { baseType, getAbbreviation, isProperty, + isValidDate, parseDate, parseNumber, toBoolean, @@ -76,7 +77,7 @@ export abstract class AASIndex { public abstract clear(): Promise; - public abstract reset(): Promise; + public abstract destroy(): Promise; protected toAbbreviation(referable: aas.Referable): string { return getAbbreviation(referable.modelType)!.toLowerCase(); @@ -123,7 +124,8 @@ export abstract class AASIndex { return undefined; } - return parseDate(referable.value); + const value = parseDate(referable.value); + return isValidDate(value) ? value : undefined; } protected toBooleanValue(referable: aas.Referable): boolean | undefined { diff --git a/projects/aas-server/src/app/aas-index/lowdb/lowdb-index.ts b/projects/aas-server/src/app/aas-index/lowdb/lowdb-index.ts index 308ea7b5..eaf57b36 100644 --- a/projects/aas-server/src/app/aas-index/lowdb/lowdb-index.ts +++ b/projects/aas-server/src/app/aas-index/lowdb/lowdb-index.ts @@ -24,7 +24,6 @@ import { import { AASIndex } from '../aas-index.js'; import { LowDbQuery } from './lowdb-query.js'; import { Variable } from '../../variable.js'; -import { urlToEndpoint } from '../../configuration.js'; import { ERRORS } from '../../errors.js'; import { LowDbData, LowDbDocument, LowDbElement } from './lowdb-types.js'; import { decodeBase64Url, encodeBase64Url } from '../../convert.js'; @@ -33,8 +32,6 @@ import { KeywordDirectory } from '../keyword-directory.js'; import { Logger } from '../../logging/logger.js'; export class LowDbIndex extends AASIndex { - private readonly promise: Promise; - public constructor( private readonly logger: Logger, private readonly variable: Variable, @@ -42,54 +39,60 @@ export class LowDbIndex extends AASIndex { keywordDirectory: KeywordDirectory, ) { super(keywordDirectory); - - this.promise = this.initialize(); - this.promise.then(() => logger.info('Using internal AAS index.')).catch(error => logger.error(error)); } - public override async getCount(endpoint?: string): Promise { - await this.promise; - if (endpoint === undefined) { - return this.db.data.documents.length; - } + public override destroy(): Promise { + return Promise.resolve(); + } - let count = 0; - this.db.data.documents.forEach(item => { - if (item.endpoint === endpoint) { - ++count; + public override getCount(endpoint?: string): Promise { + return new Promise(resolve => { + if (endpoint === undefined) { + resolve(this.db.data.documents.length); + return; } - }); - return count; + let count = 0; + this.db.data.documents.forEach(item => { + if (item.endpoint === endpoint) { + ++count; + } + }); + + resolve(count); + }); } - public override async getEndpoints(): Promise { - await this.promise; - return this.db.data.endpoints; + public override getEndpoints(): Promise { + return new Promise(resolve => { + resolve(this.db.data.endpoints); + }); } - public override async getEndpointCount(): Promise { - await this.promise; - return this.db.data.endpoints.length; + public override getEndpointCount(): Promise { + return new Promise(resolve => { + resolve(this.db.data.endpoints.length); + }); } - public override async getEndpoint(name: string): Promise { - await this.promise; - const endpoint = this.db.data.endpoints.find(endpoint => endpoint.name === name); - if (!endpoint) { - throw new Error(`An endpoint with the name ${name} does not exist.`); - } + public override getEndpoint(name: string): Promise { + return new Promise(resolve => { + const endpoint = this.db.data.endpoints.find(endpoint => endpoint.name === name); + if (!endpoint) { + throw new Error(`An endpoint with the name ${name} does not exist.`); + } - return endpoint; + resolve(endpoint); + }); } - public override async hasEndpoint(name: string): Promise { - await this.promise; - return this.db.data.endpoints.find(endpoint => endpoint.name === name) !== undefined; + public override hasEndpoint(name: string): Promise { + return new Promise(resolve => { + resolve(this.db.data.endpoints.find(endpoint => endpoint.name === name) !== undefined); + }); } public override async addEndpoint(endpoint: AASEndpoint): Promise { - await this.promise; if (this.db.data.endpoints.some(item => item.name === endpoint.name)) { throw new ApplicationError( `An endpoint with the name "${name}" already exists.`, @@ -103,7 +106,6 @@ export class LowDbIndex extends AASIndex { } public override async updateEndpoint(endpoint: AASEndpoint): Promise { - await this.promise; const index = this.db.data.endpoints.findIndex(item => item.name === endpoint.name); if (index < 0) { throw new Error(`An endpoint with the name ${name} does not exist.`); @@ -116,9 +118,10 @@ export class LowDbIndex extends AASIndex { } public override async removeEndpoint(endpointName: string): Promise { - await this.promise; const index = this.db.data.endpoints.findIndex(endpoint => endpoint.name === endpointName); - if (index < 0) return false; + if (index < 0) { + return false; + } this.db.data.endpoints.splice(index, 1); this.removeDocuments(endpointName); @@ -126,82 +129,76 @@ export class LowDbIndex extends AASIndex { return true; } - public override async nextPage( + public override nextPage( endpointName: string, cursor: string | undefined, limit: number = 100, ): Promise> { - await this.promise; - - if (cursor) { - cursor = decodeBase64Url(cursor); - } - - const documents: AASDocument[] = []; - if (this.db.data.documents.length === 0) { - return { result: documents, paging_metadata: {} }; - } - - const items = this.db.data.documents; - const index = items.findIndex(item => { - if (item.endpoint !== endpointName) { - return false; + return new Promise>(resolve => { + if (cursor) { + cursor = decodeBase64Url(cursor); } - return cursor === undefined || cursor.localeCompare(item.id) <= 0; - }); - - if (index < 0) { - return { result: documents, paging_metadata: {} }; - } - - const result: AASDocument[] = []; - for (let i = 0, j = index, n = items.length; i < limit && j < n; i++, j++) { - const item = items[j]; - if (item.endpoint !== endpointName) { - break; + const documents: AASDocument[] = []; + if (this.db.data.documents.length === 0) { + resolve({ result: documents, paging_metadata: {} }); + return; } - result.push(item); - } + const items = this.db.data.documents; + const index = items.findIndex(item => { + if (item.endpoint !== endpointName) { + return false; + } - const k = index + limit + 1; - if (k >= items.length || items[k].endpoint !== endpointName) { - return { result, paging_metadata: {} }; - } + return cursor === undefined || cursor.localeCompare(item.id) <= 0; + }); - return { result, paging_metadata: { cursor: encodeBase64Url(items[k].id) } }; - } + if (index < 0) { + resolve({ result: documents, paging_metadata: {} }); + return; + } - public override async getDocuments( - cursor: AASCursor, - expression?: string, - language?: string, - ): Promise { - await this.promise; + const result: AASDocument[] = []; + for (let i = 0, j = index, n = items.length; i < limit && j < n; i++, j++) { + const item = items[j]; + if (item.endpoint !== endpointName) { + break; + } - let query: LowDbQuery | undefined; - if (expression) { - query = new LowDbQuery(expression, language ?? 'en'); - } + result.push(item); + } - if (cursor.next) { - return this.getNextPage(cursor.next, cursor.limit, query); - } + const k = index + limit + 1; + if (k >= items.length || items[k].endpoint !== endpointName) { + resolve({ result, paging_metadata: {} }); + return; + } - if (cursor.previous) { - return this.getPreviousPage(cursor.previous, cursor.limit, query); - } + resolve({ result, paging_metadata: { cursor: encodeBase64Url(items[k].id) } }); + }); + } - if (cursor.previous === null) { - return this.getFirstPage(cursor.limit, query); - } + public override getDocuments(cursor: AASCursor, expression?: string, language?: string): Promise { + return new Promise(resolve => { + let query: LowDbQuery | undefined; + if (expression) { + query = new LowDbQuery(expression, language ?? 'en'); + } - return this.getLastPage(cursor.limit, query); + if (cursor.next) { + resolve(this.getNextPage(cursor.next, cursor.limit, query)); + } else if (cursor.previous) { + resolve(this.getPreviousPage(cursor.previous, cursor.limit, query)); + } else if (cursor.previous === null) { + resolve(this.getFirstPage(cursor.limit, query)); + } else { + resolve(this.getLastPage(cursor.limit, query)); + } + }); } public override async update(document: AASDocument): Promise { - await this.promise; const name = document.endpoint; const documents = this.db.data.documents; const index = documents.findIndex(item => item.endpoint === name && item.id === document.id); @@ -218,23 +215,23 @@ export class LowDbIndex extends AASIndex { } } - public async find(endpointName: string | undefined, id: string): Promise { - await this.promise; - const document = endpointName - ? this.db.data.documents.find( - item => item.endpoint === endpointName && (item.id === id || item.assetId === id), - ) - : this.db.data.documents.find(item => item.id === id || item.assetId === id); - - if (document) { - return this.toDocument(document); - } - - return undefined; + public override find(endpointName: string | undefined, id: string): Promise { + return new Promise(resolve => { + const document = endpointName + ? this.db.data.documents.find( + item => item.endpoint === endpointName && (item.id === id || item.assetId === id), + ) + : this.db.data.documents.find(item => item.id === id || item.assetId === id); + + if (document === undefined) { + resolve(undefined); + return; + } + resolve(this.toDocument(document)); + }); } public override async add(document: AASDocument): Promise { - await this.promise; const endpoint = document.endpoint; const id = document.id; const documents = this.db.data.documents; @@ -254,7 +251,6 @@ export class LowDbIndex extends AASIndex { } public override async remove(endpointName: string, id: string): Promise { - await this.promise; const documents = this.db.data.documents; const index = documents.findIndex(item => item.endpoint === endpointName && item.id === id); if (index < 0) return false; @@ -274,11 +270,6 @@ export class LowDbIndex extends AASIndex { await this.db.write(); } - public override async reset(): Promise { - this.db.data.endpoints = this.variable.ENDPOINTS.map(endpoint => urlToEndpoint(endpoint)); - await this.db.write(); - } - private removeDocuments(endpoint: string) { const documents = this.db.data.documents.filter(document => document.endpoint === endpoint); this.db.data.documents = this.db.data.documents.filter(document => document.endpoint !== endpoint); @@ -293,16 +284,6 @@ export class LowDbIndex extends AASIndex { return document; } - private async initialize(): Promise { - await this.db.read(); - - if (this.db.data.endpoints.length === 0) { - const endpoints = this.variable.ENDPOINTS.map(endpoint => urlToEndpoint(endpoint)); - this.db.data.endpoints.push(...endpoints); - await this.db.write(); - } - } - private getInsertPosition(document: AASDocumentId): number { let index = 0; for (const item of this.db.data.documents) { diff --git a/projects/aas-server/src/app/aas-index/mysql/mysql-index.ts b/projects/aas-server/src/app/aas-index/mysql/mysql-index.ts index b8d8cd8b..6f892b41 100644 --- a/projects/aas-server/src/app/aas-index/mysql/mysql-index.ts +++ b/projects/aas-server/src/app/aas-index/mysql/mysql-index.ts @@ -22,7 +22,6 @@ import { import { AASIndex } from '../aas-index.js'; import { Variable } from '../../variable.js'; -import { urlToEndpoint } from '../../configuration.js'; import { MySqlQuery } from './mysql-query.js'; import { DocumentCount, MySqlDocument, MySqlEndpoint } from './mysql-types.js'; import { PagedResult } from '../../types/paged-result.js'; @@ -31,35 +30,25 @@ import { Logger } from '../../logging/logger.js'; import { urlToString } from '../../convert.js'; export class MySqlIndex extends AASIndex { - private readonly connection: Promise; + private _connection!: Connection; public constructor( private readonly logger: Logger, private readonly variable: Variable, keywordDirectory: KeywordDirectory, - connection?: Connection, ) { super(keywordDirectory); + } - if (connection === undefined) { - this.connection = this.initialize(); - this.connection - .then(() => { - logger.info(`AAS index connected to ${urlToString(this.variable.AAS_INDEX)}.`); - }) - .catch(error => { - this.logger.error(error); - }); - } else { - this.connection = new Promise(resolve => resolve(connection)); - } + public override async destroy(): Promise { + const connection = await this.getConnection(); + await connection.end(); } public override async getCount(endpoint?: string): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); if (endpoint === undefined) { const result = await connection.query('SELECT COUNT(*) FROM `documents`;'); - return result[0][0]['COUNT(*)']; } @@ -71,39 +60,19 @@ export class MySqlIndex extends AASIndex { } public override async getEndpointCount(): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); const result = await connection.query('SELECT COUNT(*) FROM `endpoints` AS count;'); return result[0][0]['COUNT(*)']; } public override async getEndpoints(): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); const result = await connection.query('SELECT * FROM `endpoints`;'); - return result[0].map(row => { - const endpoint: AASEndpoint = { - name: row.name, - url: row.url, - type: row.type, - }; - - if (row.version) { - endpoint.version = row.version; - } - - if (row.headers) { - endpoint.headers = JSON.parse(row.headers); - } - - if (row.schedule) { - endpoint.schedule = JSON.parse(row.schedule); - } - - return endpoint; - }); + return result[0].map(row => this.toEndpoint(row)); } public override async getEndpoint(name: string): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); const [results] = await connection.query('SELECT * FROM `endpoints` WHERE name = ?;', [name]); if (results.length === 0) { throw new Error(`An endpoint with the name "${name}" does not exist.`); @@ -113,13 +82,13 @@ export class MySqlIndex extends AASIndex { } public override async hasEndpoint(name: string): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); const [results] = await connection.query('SELECT * FROM `endpoints` WHERE name = ?;', [name]); return results.length > 0; } public override async addEndpoint(endpoint: AASEndpoint): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); await connection.query( 'INSERT INTO `endpoints` (name, url, type, version, headers, schedule) VALUES (?, ?, ?, ?, ?, ?);', [ @@ -134,7 +103,7 @@ export class MySqlIndex extends AASIndex { } public override async updateEndpoint(endpoint: AASEndpoint): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); try { await connection.beginTransaction(); const [results] = await connection.query('SELECT * FROM `endpoints` WHERE name = ?;', [ @@ -167,16 +136,14 @@ export class MySqlIndex extends AASIndex { } public override async removeEndpoint(endpointName: string): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); try { await connection.beginTransaction(); - const result = await connection.query('DELETE FROM `endpoints` WHERE name = ?;', [ endpointName, ]); - this.removeDocuments(endpointName); - + this.removeDocuments(connection, endpointName); await connection.commit(); return result[0].affectedRows > 0; } catch (error) { @@ -185,25 +152,30 @@ export class MySqlIndex extends AASIndex { } } - public override getDocuments(cursor: AASCursor, expression?: string, language?: string): Promise { + public override async getDocuments( + cursor: AASCursor, + expression?: string, + language?: string, + ): Promise { let query: MySqlQuery | undefined; if (expression) { query = new MySqlQuery(expression, language ?? 'en'); } + const connection = await this.getConnection(); if (cursor.next) { - return this.getNextPage(cursor.next, cursor.limit, query); + return this.getNextPage(connection, cursor.next, cursor.limit, query); } if (cursor.previous) { - return this.getPreviousPage(cursor.previous, cursor.limit, query); + return this.getPreviousPage(connection, cursor.previous, cursor.limit, query); } if (cursor.previous === null) { - return this.getFirstPage(cursor.limit, query); + return this.getFirstPage(connection, cursor.limit, query); } - return this.getLastPage(cursor.limit, query); + return this.getLastPage(connection, cursor.limit, query); } public override async nextPage( @@ -211,7 +183,6 @@ export class MySqlIndex extends AASIndex { cursor: string | undefined, limit: number = 100, ): Promise> { - const connection = await this.connection; let sql: string; const values: unknown[] = [endpointName]; if (cursor) { @@ -222,6 +193,7 @@ export class MySqlIndex extends AASIndex { } values.push(limit + 1); + const connection = await this.getConnection(); const [results] = await connection.query(sql, values); const documents = results.map(result => this.toDocument(result)); return { @@ -233,7 +205,7 @@ export class MySqlIndex extends AASIndex { } public override async update(document: AASDocument): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); try { await connection.beginTransaction(); const result = await connection.query( @@ -264,7 +236,7 @@ export class MySqlIndex extends AASIndex { } public override async add(document: AASDocument): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); try { await connection.beginTransaction(); const uuid = v4(); @@ -295,7 +267,10 @@ export class MySqlIndex extends AASIndex { } public override async find(endpoint: string | undefined, id: string): Promise { - const document = endpoint ? await this.selectEndpointDocument(endpoint, id) : await this.selectDocument(id); + const connection = await this.getConnection(); + const document = endpoint + ? await this.selectEndpointDocument(connection, endpoint, id) + : await this.selectDocument(connection, id); if (!document) { return undefined; } @@ -304,7 +279,7 @@ export class MySqlIndex extends AASIndex { } public override async remove(endpointName: string, id: string): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); try { await connection.beginTransaction(); const [results] = await connection.query( @@ -328,7 +303,7 @@ export class MySqlIndex extends AASIndex { } public override async clear(): Promise { - const connection = await this.connection; + const connection = await this.getConnection(); try { await connection.beginTransaction(); await connection.query('DELETE FROM `elements`;'); @@ -341,20 +316,26 @@ export class MySqlIndex extends AASIndex { } } - public override async reset(): Promise { - const connection = await this.connection; - try { - await connection.beginTransaction(); - await this.addDefaultEndpoints(connection); - await connection.commit(); - } catch (error) { - await connection.rollback(); - throw error; + private async getConnection(): Promise { + if (this._connection === undefined) { + const url = new URL(this.variable.AAS_INDEX!); + const username = isEmpty(url.username) ? this.variable.AAS_SERVER_USERNAME : url.username; + const password = isEmpty(url.password) ? this.variable.AAS_SERVER_PASSWORD : url.password; + this._connection = await mysql.createConnection({ + host: url.hostname, + port: Number(url.port), + database: 'aas-index', + user: username, + password: password, + }); + + this.logger.info(`AAS index connected to ${urlToString(this.variable.AAS_INDEX)}.`); } + + return this._connection; } - private async removeDocuments(endpointName: string): Promise { - const connection = await this.connection; + private async removeDocuments(connection: Connection, endpointName: string): Promise { const documents = ( await connection.query('SELECT * FROM `documents` WHERE endpoint = ?;', [endpointName]) )[0]; @@ -366,8 +347,7 @@ export class MySqlIndex extends AASIndex { } } - private async getFirstPage(limit: number, query?: MySqlQuery): Promise { - const connection = await this.connection; + private async getFirstPage(connection: Connection, limit: number, query?: MySqlQuery): Promise { let sql: string; const values: unknown[] = []; if (query) { @@ -397,8 +377,12 @@ export class MySqlIndex extends AASIndex { }; } - private async getNextPage(current: AASDocumentId, limit: number, query?: MySqlQuery): Promise { - const connection = await this.connection; + private async getNextPage( + connection: Connection, + current: AASDocumentId, + limit: number, + query?: MySqlQuery, + ): Promise { let sql: string; const values: unknown[] = [current.endpoint + current.id]; @@ -429,8 +413,12 @@ export class MySqlIndex extends AASIndex { }; } - private async getPreviousPage(current: AASDocumentId, limit: number, query?: MySqlQuery): Promise { - const connection = await this.connection; + private async getPreviousPage( + connection: Connection, + current: AASDocumentId, + limit: number, + query?: MySqlQuery, + ): Promise { let sql: string; const values: unknown[] = [current.endpoint + current.id]; @@ -461,8 +449,7 @@ export class MySqlIndex extends AASIndex { }; } - private async getLastPage(limit: number, query?: MySqlQuery): Promise { - const connection = await this.connection; + private async getLastPage(connection: Connection, limit: number, query?: MySqlQuery): Promise { let sql: string; const values: unknown[] = []; if (query) { @@ -492,8 +479,11 @@ export class MySqlIndex extends AASIndex { }; } - private async selectEndpointDocument(endpoint: string, id: string): Promise { - const connection = await this.connection; + private async selectEndpointDocument( + connection: Connection, + endpoint: string, + id: string, + ): Promise { const [results] = await connection.query( 'SELECT * FROM `documents` WHERE endpoint = ? AND (id = ? OR assetId = ?)', [endpoint, id, id], @@ -506,8 +496,7 @@ export class MySqlIndex extends AASIndex { return results[0]; } - private async selectDocument(id: string): Promise { - const connection = await this.connection; + private async selectDocument(connection: Connection, id: string): Promise { const [results] = await connection.query( 'SELECT * FROM `documents` WHERE (id = ? OR assetId = ?)', [id, id], @@ -592,40 +581,4 @@ export class MySqlIndex extends AASIndex { return document; } - - private async initialize(): Promise { - const url = new URL(this.variable.AAS_INDEX!); - const username = isEmpty(url.username) ? this.variable.AAS_SERVER_USERNAME : url.username; - const password = isEmpty(url.password) ? this.variable.AAS_SERVER_PASSWORD : url.password; - const connection = await mysql.createConnection({ - host: url.hostname, - port: Number(url.port), - database: 'aas-index', - user: username, - password: password, - }); - - const result = await connection.query('SELECT * FROM `endpoints`'); - if (result[0].length === 0) { - try { - connection.beginTransaction(); - await this.addDefaultEndpoints(connection); - connection.commit(); - } catch (error) { - connection.rollback(); - throw error; - } - } - - return connection; - } - - private async addDefaultEndpoints(connection: Connection): Promise { - for (const endpoint of this.variable.ENDPOINTS.map(endpoint => urlToEndpoint(endpoint))) { - await connection.query( - 'INSERT INTO `endpoints` (name, url, type, version) VALUES (?, ?, ?, ?)', - [endpoint.name, endpoint.url, endpoint.type, endpoint.version], - ); - } - } } diff --git a/projects/aas-server/src/app/aas-provider/aas-provider.ts b/projects/aas-server/src/app/aas-provider/aas-provider.ts index f8da73e5..62b09917 100644 --- a/projects/aas-server/src/app/aas-provider/aas-provider.ts +++ b/projects/aas-server/src/app/aas-provider/aas-provider.ts @@ -41,10 +41,10 @@ import { ERRORS } from '../errors.js'; import { Task, TaskHandler } from './task-handler.js'; import { HierarchicalStructure } from './hierarchical-structure.js'; import { AASCache } from './aas-cache.js'; +import { urlToEndpoint } from '../configuration.js'; @singleton() export class AASProvider { - private readonly file: string | undefined; private readonly cache = new AASCache(); private wsServer!: WSServer; private resetRequested = false; @@ -59,6 +59,7 @@ export class AASProvider { ) { this.parallel.on('message', this.parallelOnMessage); this.parallel.on('end', this.parallelOnEnd); + this.taskHandler.on('empty', this.taskHandlerOnEmpty); } /** @@ -68,7 +69,7 @@ export class AASProvider { public start(wsServer: WSServer): void { this.wsServer = wsServer; this.wsServer.on('message', this.onClientMessage); - setTimeout(this.startScan, 100); + this.initializeIndex().then(() => setTimeout(this.startScan, 100)); } /** @@ -294,16 +295,18 @@ export class AASProvider { this.resetRequested = true; await this.index.clear(); this.cache.clear(); + for (const task of [...this.taskHandler.tasks]) { + if (task.state === 'idle' && task.owner === this) { + this.taskHandler.delete(task.id); + } + } + this.wsServer.notify('IndexChange', { type: 'AASServerMessage', data: { type: 'Reset', } as AASServerMessage, }); - - if (this.taskHandler.empty(this)) { - await this.restart(); - } } /** @@ -445,15 +448,15 @@ export class AASProvider { */ public async startEndpointScan(name: string): Promise { const endpoint = await this.index.getEndpoint(name); - const task = this.taskHandler.find(name, 'ScanEndpoint'); - if (task === undefined) { - throw new Error(``); - } - if (endpoint.schedule?.type !== 'manual') { throw new Error(`Endpoint ${name} is not configured for the manual start of a scan.`); } + let task = this.taskHandler.find(name, 'ScanEndpoint'); + if (task === undefined) { + task = this.taskHandler.createTask(endpoint.name, this, 'ScanEndpoint'); + } + if (task.state === 'inProgress') { throw new Error(`Scanning endpoint ${name} is already in progress.`); } @@ -461,14 +464,35 @@ export class AASProvider { setTimeout(this.scanEndpoint, 0, task, endpoint); } + public destroy(): void { + this.parallel.off('message', this.parallelOnMessage); + this.parallel.off('end', this.parallelOnEnd); + this.taskHandler.off('empty', this.taskHandlerOnEmpty); + } + private async restart(): Promise { this.resetRequested = false; - await this.index.reset(); + await this.initializeIndex(); this.cache.clear(); await this.startScan(); this.logger.info('AAS Server configuration restored.'); } + private async initializeIndex(): Promise { + if ((await this.index.getEndpointCount()) === 0) { + for (const endpoint of this.variable.ENDPOINTS.map(endpoint => urlToEndpoint(endpoint))) { + await this.index.addEndpoint(endpoint); + this.wsServer.notify('IndexChange', { + type: 'AASServerMessage', + data: { + type: 'EndpointAdded', + endpoint: endpoint, + } as AASServerMessage, + }); + } + } + } + private onClientMessage = async (data: WebSocketData, client: SocketClient): Promise => { try { switch (data.type) { @@ -584,12 +608,13 @@ export class AASProvider { if ((await this.index.hasEndpoint(task.endpointName)) === true) { const endpoint = await this.index.getEndpoint(task.endpointName); + task.state = 'idle'; + task.end = Date.now(); + if (endpoint.schedule?.type === 'once' || endpoint.schedule?.type === 'manual') { return; } - task.state === 'idle'; - task.end = Date.now(); setTimeout( this.scanEndpoint, this.computeTimeout(endpoint.schedule, task.start, task.end), @@ -604,8 +629,8 @@ export class AASProvider { this.logger.stop(); } - if (this.resetRequested && this.taskHandler.empty(this)) { - await this.restart(); + if (this.resetRequested) { + this.taskHandler.delete(task.id); } }; @@ -724,4 +749,10 @@ export class AASProvider { await resource.closeAsync(); } } + + private readonly taskHandlerOnEmpty = async (owner: object): Promise => { + if (owner === this && this.resetRequested) { + await this.restart(); + } + }; } diff --git a/projects/aas-server/src/app/aas-provider/aas-server-scan.ts b/projects/aas-server/src/app/aas-provider/aas-server-scan.ts index cbdce44d..113299b0 100644 --- a/projects/aas-server/src/app/aas-provider/aas-server-scan.ts +++ b/projects/aas-server/src/app/aas-provider/aas-server-scan.ts @@ -15,28 +15,28 @@ import { PagedResult } from '../types/paged-result.js'; export class AASServerScan extends AASResourceScan { private readonly logger: Logger; - private readonly server: AASApiClient; + private readonly client: AASApiClient; public constructor(logger: Logger, server: AASApiClient) { super(); this.logger = logger; - this.server = server; + this.client = server; } protected override open(): Promise { - return this.server.openAsync(); + return this.client.openAsync(); } protected override close(): Promise { - return this.server.closeAsync(); + return this.client.closeAsync(); } protected override createDocument(id: string): Promise { - const aasPackage = new AASServerPackage(this.logger, this.server, id); + const aasPackage = new AASServerPackage(this.logger, this.client, id); return aasPackage.createDocumentAsync(); } protected override nextEndpointPage(cursor: string | undefined): Promise> { - return this.server.getShellsAsync(cursor); + return this.client.getShellsAsync(cursor); } } diff --git a/projects/aas-server/src/app/aas-provider/task-handler.ts b/projects/aas-server/src/app/aas-provider/task-handler.ts index 0c3b05bf..6e23fbc6 100644 --- a/projects/aas-server/src/app/aas-provider/task-handler.ts +++ b/projects/aas-server/src/app/aas-provider/task-handler.ts @@ -7,6 +7,7 @@ *****************************************************************************/ import { singleton } from 'tsyringe'; +import EventEmitter from 'events'; export interface Task { id: number; @@ -20,24 +21,45 @@ export interface Task { @singleton() export class TaskHandler { - private readonly tasks = new Map(); + private readonly eventEmitter = new EventEmitter(); + private readonly _tasks = new Map(); private nextTaskId = 1; + public get tasks(): Iterable { + return this._tasks.values(); + } + + public on(event: 'empty', handler: EventListener): EventEmitter { + return this.eventEmitter.on(event, handler); + } + + public off(event: 'empty', handler: EventListener): EventEmitter { + return this.eventEmitter.off(event, handler); + } + public delete(taskId: number): void { - this.tasks.delete(taskId); + const task = this._tasks.get(taskId); + if (task === undefined) { + return; + } + + this._tasks.delete(taskId); + if (this.empty(task.owner)) { + this.eventEmitter.emit('empty', task.owner); + } } public get(taskId: number): Task | undefined { - return this.tasks.get(taskId); + return this._tasks.get(taskId); } public set(task: Task) { - this.tasks.set(task.id, task); + this._tasks.set(task.id, task); } - public empty(owner: object, name?: string): boolean { - for (const task of this.tasks.values()) { - if (task.owner === owner && (!name || task.endpointName === name)) { + public empty(owner: object): boolean { + for (const task of this._tasks.values()) { + if (task.owner === owner) { return false; } } @@ -60,7 +82,7 @@ export class TaskHandler { } public find(endpointName: string, type: 'ScanEndpoint' | 'ScanTemplates'): Task | undefined { - for (const task of this.tasks.values()) { + for (const task of this._tasks.values()) { if (task.endpointName === endpointName && type === task.type) { return task; } diff --git a/projects/aas-server/src/app/aas-scan-worker.ts b/projects/aas-server/src/app/aas-scan-worker.ts index 91807ce5..15e4d328 100644 --- a/projects/aas-server/src/app/aas-scan-worker.ts +++ b/projects/aas-server/src/app/aas-scan-worker.ts @@ -7,11 +7,16 @@ *****************************************************************************/ import 'reflect-metadata'; -import { MemoryLogger, MemoryLoggerLevel } from './logging/memory-logger.js'; import { container } from 'tsyringe'; +import { parentPort } from 'worker_threads'; +import { MemoryLogger, MemoryLoggerLevel } from './logging/memory-logger.js'; import { WorkerApp } from './worker-app.js'; import { AASIndexFactory } from './aas-index/aas-index-factory.js'; +parentPort?.on('close', () => { + container.dispose(); +}); + container.register('Logger', MemoryLogger); container.register('AASIndex', { useFactory: c => new AASIndexFactory(c).create() }); container.registerInstance( diff --git a/projects/aas-server/src/app/worker-app.ts b/projects/aas-server/src/app/worker-app.ts index 04bb22b7..73cba87e 100644 --- a/projects/aas-server/src/app/worker-app.ts +++ b/projects/aas-server/src/app/worker-app.ts @@ -27,7 +27,7 @@ export class WorkerApp { parentPort?.on('message', this.parentPortOnMessage); } - private parentPortOnMessage = async (data: WorkerData) => { + private readonly parentPortOnMessage = async (data: WorkerData) => { if (parentPort === null) { return; } diff --git a/projects/aas-server/src/app/ws-server.ts b/projects/aas-server/src/app/ws-server.ts index 0bafbdda..7e2e990a 100644 --- a/projects/aas-server/src/app/ws-server.ts +++ b/projects/aas-server/src/app/ws-server.ts @@ -56,6 +56,14 @@ export class WSServer extends EventEmitter { } public run(): void { + process.on('SIGTERM', () => { + this.logger.info('Shutting down AASNode'); + this.server.close(() => { + this.logger.info('HTTP server closed.'); + process.exit(0); + }); + }); + this.server.listen(this.variable.NODE_SERVER_PORT, () => { this.logger.info(`AAS-Server listening on ${this.variable.NODE_SERVER_PORT}`); });