diff --git a/projects/aas-core/src/lib/server-message.ts b/projects/aas-core/src/lib/server-message.ts index 5e31ce8..76eaa8e 100644 --- a/projects/aas-core/src/lib/server-message.ts +++ b/projects/aas-core/src/lib/server-message.ts @@ -12,7 +12,7 @@ import { AASDocument, AASEndpoint } from './types.js'; export type AASServerMessageType = | 'Added' | 'Removed' - | 'Changed' + | 'Update' | 'Offline' | 'EndpointAdded' | 'EndpointRemoved' diff --git a/projects/aas-lib/src/lib/index-change.service.ts b/projects/aas-lib/src/lib/index-change.service.ts index e88124f..d0c6722 100644 --- a/projects/aas-lib/src/lib/index-change.service.ts +++ b/projects/aas-lib/src/lib/index-change.service.ts @@ -105,8 +105,8 @@ export class IndexChangeService { case 'Removed': this.documentRemoved(); break; - case 'Changed': - this.documentChanged(); + case 'Update': + this.documentUpdate(); break; case 'EndpointAdded': this.endpointAdded(); @@ -128,7 +128,7 @@ export class IndexChangeService { this.state.update(state => ({ ...state, removedDocuments: state.removedDocuments + 1 })); } - private documentChanged(): void { + private documentUpdate(): void { this.state.update(state => ({ ...state, changedDocuments: state.changedDocuments + 1 })); } diff --git a/projects/aas-server/src/app/aas-provider/aas-provider.ts b/projects/aas-server/src/app/aas-provider/aas-provider.ts index 314c214..77efeba 100644 --- a/projects/aas-server/src/app/aas-provider/aas-provider.ts +++ b/projects/aas-server/src/app/aas-provider/aas-provider.ts @@ -22,6 +22,7 @@ import { ApplicationError, getChildren, isReferenceElement, + noop, } from 'aas-core'; import { ImageProcessing } from '../image-processing.js'; @@ -374,12 +375,13 @@ export class AASProvider { /** Only used for test. */ public async scanAsync(factory: AASResourceScanFactory): Promise { - for (const endpoint of await this.index.getEndpoints()) { - if (endpoint.type === 'FileSystem') { - const result = await factory.create(endpoint).scanAsync(); - result.result.forEach(async document => await this.index.add(document)); - } - } + noop(factory); + // for (const endpoint of await this.index.getEndpoints()) { + // if (endpoint.type === 'FileSystem') { + // const result = await factory.create(endpoint).scanAsync(); + // result.result.forEach(async document => await this.index.add(document)); + // } + // } } /** @@ -460,6 +462,13 @@ export class AASProvider { return resource.createSubscription(client, message, env); } + private notify(data: AASServerMessage): void { + this.wsServer.notify('IndexChange', { + type: 'AASServerMessage', + data: data, + }); + } + private startScan = async (): Promise => { try { for (const endpoint of await this.index.getEndpoints()) { @@ -470,13 +479,6 @@ export class AASProvider { } }; - private notify(data: AASServerMessage): void { - this.wsServer.notify('IndexChange', { - type: 'AASServerMessage', - data: data, - }); - } - private scanEndpoint = async (taskId: number, endpoint: AASEndpoint) => { const data: ScanEndpointData = { type: 'ScanEndpointData', @@ -491,13 +493,13 @@ export class AASProvider { private parallelOnMessage = async (result: ScanEndpointResult) => { try { switch (result.type) { - case ScanResultType.Changed: - await this.onChanged(result); + case ScanResultType.Update: + await this.onUpdate(result); break; - case ScanResultType.Added: + case ScanResultType.Add: await this.onAdded(result); break; - case ScanResultType.Removed: + case ScanResultType.Remove: await this.onRemoved(result); break; } @@ -529,7 +531,7 @@ export class AASProvider { } }; - private async onChanged(result: ScanEndpointResult): Promise { + private async onUpdate(result: ScanEndpointResult): Promise { const document = result.document; if ((await this.index.hasEndpoint(document.endpoint)) === false) { return; @@ -540,7 +542,7 @@ export class AASProvider { this.cache.set(document.endpoint, document.id, document.content); } - this.sendMessage({ type: 'Changed', document: { ...document, content: null } }); + this.sendMessage({ type: 'Update', document: { ...document, content: null } }); } private async onAdded(result: ScanEndpointResult): Promise { diff --git a/projects/aas-server/src/app/aas-provider/aas-resource-scan.ts b/projects/aas-server/src/app/aas-provider/aas-resource-scan.ts index 75b08f8..abdb72e 100644 --- a/projects/aas-server/src/app/aas-provider/aas-resource-scan.ts +++ b/projects/aas-server/src/app/aas-provider/aas-resource-scan.ts @@ -7,12 +7,101 @@ *****************************************************************************/ import EventEmitter from 'events'; +import { AASDocument, AASEndpoint } from 'aas-core'; +import { AASIndex } from '../aas-index/aas-index.js'; +import { PagedResult } from '../types/paged-result.js'; /** Defines an automate to scan an AAS resource for Asset Administration Shells. */ export abstract class AASResourceScan extends EventEmitter { /** * Gets all documents of the current container. - * @param cursor ToDo. + * @param index The AAS index. + * @param endpoint The endpoint. * */ - public abstract scanAsync(): Promise; + public async scanAsync(index: AASIndex, endpoint: AASEndpoint): Promise { + try { + await this.open(); + + const map = new Map(); + let indexCursor: string | undefined; + let endpointCursor: string | undefined; + let a = true; + let b = true; + do { + if (a) { + const result = await index.nextPage(endpoint.name, indexCursor); + for (const reference of result.result) { + let value = map.get(reference.id); + if (value === undefined) { + value = { reference }; + map.set(reference.id, value); + } else if (value.reference === undefined) { + value.reference = reference; + } + } + + indexCursor = result.paging_metadata.cursor; + if (indexCursor === undefined) { + a = false; + } + } + + if (b) { + const result = await this.nextEndpointPage(endpointCursor); + for (const id of result.result) { + let value = map.get(id); + if (value === undefined) { + value = {}; + map.set(id, value); + } + + if (value.document === undefined) { + value.document = await this.createDocument(id); + } + } + + endpointCursor = result.paging_metadata.cursor; + if (endpointCursor === undefined) { + b = false; + } + } + + const keys: string[] = []; + for (const value of map.values()) { + if (value.reference && value.document) { + keys.push(value.reference.id); + this.emit('compare', value.reference, value.document); + } else if (!a && value.document) { + keys.push(value.document.id); + this.emit('add', value.document); + } else if (!b && value.reference) { + keys.push(value.reference.id); + this.emit('remove', value.document); + } + } + + keys.forEach(key => map.delete(key)); + } while (a || b); + + for (const value of map.values()) { + if (value.reference && value.document) { + this.emit('compare', value.reference, value.document); + } else if (!a && value.document) { + this.emit('add', value.document); + } else if (!b && value.reference) { + this.emit('remove', value.document); + } + } + } finally { + await this.close(); + } + } + + protected abstract open(): Promise; + + protected abstract close(): Promise; + + protected abstract createDocument(id: string): Promise; + + protected abstract nextEndpointPage(cursor: string | undefined): Promise>; } diff --git a/projects/aas-server/src/app/aas-provider/aas-server-scan.ts b/projects/aas-server/src/app/aas-provider/aas-server-scan.ts index e73321e..cbdce44 100644 --- a/projects/aas-server/src/app/aas-provider/aas-server-scan.ts +++ b/projects/aas-server/src/app/aas-provider/aas-server-scan.ts @@ -11,13 +11,12 @@ import { Logger } from '../logging/logger.js'; import { AASApiClient } from '../packages/aas-server/aas-api-client.js'; import { AASServerPackage } from '../packages/aas-server/aas-server-package.js'; import { AASResourceScan } from './aas-resource-scan.js'; +import { PagedResult } from '../types/paged-result.js'; export class AASServerScan extends AASResourceScan { private readonly logger: Logger; private readonly server: AASApiClient; - private static set = new Set(); - public constructor(logger: Logger, server: AASApiClient) { super(); @@ -25,31 +24,19 @@ export class AASServerScan extends AASResourceScan { this.server = server; } - public async scanAsync(): Promise { - try { - await this.server.openAsync(); + protected override open(): Promise { + return this.server.openAsync(); + } + protected override close(): Promise { + return this.server.closeAsync(); + } + + protected override createDocument(id: string): Promise { + const aasPackage = new AASServerPackage(this.logger, this.server, id); + return aasPackage.createDocumentAsync(); + } - const documents: AASDocument[] = []; - const result = await this.server.getShellsAsync(); - const ids = new Set(result.result); - for (const id of ids) { - if (AASServerScan.set.has(id)) { - AASServerScan.set.delete(id); - } else { - AASServerScan.set.add(id); - } - - try { - const aasPackage = new AASServerPackage(this.logger, this.server, id); - const document = await aasPackage.createDocumentAsync(); - documents.push(document); - this.emit('scanned', document); - } catch (error) { - this.emit('error', error, this.server, id); - } - } - } finally { - await this.server.closeAsync(); - } + protected override nextEndpointPage(cursor: string | undefined): Promise> { + return this.server.getShellsAsync(cursor); } } diff --git a/projects/aas-server/src/app/aas-provider/directory-scan.ts b/projects/aas-server/src/app/aas-provider/directory-scan.ts index 8f94370..81468a2 100644 --- a/projects/aas-server/src/app/aas-provider/directory-scan.ts +++ b/projects/aas-server/src/app/aas-provider/directory-scan.ts @@ -11,8 +11,11 @@ import { Logger } from '../logging/logger.js'; import { AasxPackage } from '../packages/file-system/aasx-package.js'; import { AasxDirectory } from '../packages/file-system/aasx-directory.js'; import { AASResourceScan } from './aas-resource-scan.js'; +import { PagedResult } from '../types/paged-result.js'; export class DirectoryScan extends AASResourceScan { + private readonly map = new Map(); + public constructor( private readonly logger: Logger, private readonly source: AasxDirectory, @@ -20,23 +23,35 @@ export class DirectoryScan extends AASResourceScan { super(); } - public async scanAsync(): Promise { - try { - await this.source.openAsync(); - const result = await this.source.getFiles(cursor); - const documents: AASDocument[] = []; - for (const file of result.result) { - try { - const aasxPackage = new AasxPackage(this.logger, this.source, file); - const document = await aasxPackage.createDocumentAsync(); - documents.push(document); - this.emit('scanned', document); - } catch (error) { - this.emit('error', error, this.source, file); - } + protected override open(): Promise { + this.map.clear(); + return this.source.openAsync(); + } + + protected override close(): Promise { + this.map.clear(); + return this.source.closeAsync(); + } + + protected override createDocument(id: string): Promise { + const document = this.map.get(id); + return document ? Promise.resolve(document) : Promise.reject(new Error(`${id} not found.`)); + } + + protected override async nextEndpointPage(cursor: string | undefined): Promise> { + const result = await this.source.getFiles(cursor); + const ids: string[] = []; + for (const file of result.result) { + try { + const aasxPackage = new AasxPackage(this.logger, this.source, file); + const document = await aasxPackage.createDocumentAsync(); + ids.push(document.id); + this.map.set(document.id, document); + } catch (error) { + this.emit('error', error, this.source, file); } - } finally { - await this.source.closeAsync(); } + + return { result: ids, paging_metadata: {} }; } } diff --git a/projects/aas-server/src/app/aas-provider/opcua-server-scan.ts b/projects/aas-server/src/app/aas-provider/opcua-server-scan.ts index a7056dc..f1726dc 100644 --- a/projects/aas-server/src/app/aas-provider/opcua-server-scan.ts +++ b/projects/aas-server/src/app/aas-provider/opcua-server-scan.ts @@ -7,16 +7,18 @@ *****************************************************************************/ import { AttributeIds, BrowseDescriptionLike, QualifiedName, ReferenceDescription } from 'node-opcua'; -import { AASDocument } from 'aas-core'; +import { AASDocument, noop } from 'aas-core'; import { Logger } from '../logging/logger.js'; import { OpcuaDataTypeDictionary } from '../packages/opcua/opcua-data-type-dictionary.js'; import { OpcuaClient } from '../packages/opcua/opcua-client.js'; import { OpcuaPackage } from '../packages/opcua/opcua-package.js'; import { AASResourceScan } from './aas-resource-scan.js'; +import { PagedResult } from '../types/paged-result.js'; export class OpcuaServerScan extends AASResourceScan { private readonly logger: Logger; private readonly server: OpcuaClient; + private readonly map = new Map(); public constructor(logger: Logger, server: OpcuaClient) { super(); @@ -25,26 +27,39 @@ export class OpcuaServerScan extends AASResourceScan { this.server = server; } - public async scanAsync(): Promise { - try { - await this.server.openAsync(); - const documents: AASDocument[] = []; - const dataTypes = new OpcuaDataTypeDictionary(); - await dataTypes.initializeAsync(this.server.getSession()); - for (const description of await this.browseAsync('ObjectsFolder')) { - const nodeId = description.nodeId.toString(); - try { - const opcuaPackage = new OpcuaPackage(this.logger, this.server, nodeId, dataTypes); - const document = await opcuaPackage.createDocumentAsync(); - documents.push(document); - this.emit('scanned', document); - } catch (error) { - this.emit('error', error, this.server, nodeId); - } + protected override open(): Promise { + this.map.clear(); + return this.server.openAsync(); + } + + protected override close(): Promise { + this.map.clear(); + return this.server.closeAsync(); + } + + protected override createDocument(id: string): Promise { + const document = this.map.get(id); + return document ? Promise.resolve(document) : Promise.reject(new Error(`${id} not found.`)); + } + + protected override async nextEndpointPage(cursor: string | undefined): Promise> { + noop(cursor); + const ids: string[] = []; + const dataTypes = new OpcuaDataTypeDictionary(); + await dataTypes.initializeAsync(this.server.getSession()); + for (const description of await this.browseAsync('ObjectsFolder')) { + const nodeId = description.nodeId.toString(); + try { + const opcuaPackage = new OpcuaPackage(this.logger, this.server, nodeId, dataTypes); + const document = await opcuaPackage.createDocumentAsync(); + ids.push(document.id); + this.map.set(document.id, document); + } catch (error) { + this.emit('error', error, this.server, nodeId); } - } finally { - await this.server.closeAsync(); } + + return { result: ids, paging_metadata: {} }; } private async browseAsync( diff --git a/projects/aas-server/src/app/aas-provider/parallel.ts b/projects/aas-server/src/app/aas-provider/parallel.ts index f10a966..eefa940 100644 --- a/projects/aas-server/src/app/aas-provider/parallel.ts +++ b/projects/aas-server/src/app/aas-provider/parallel.ts @@ -57,9 +57,6 @@ class WorkerTask extends EventEmitter { case ScanResultType.End: this.emit('end', result, this); break; - case ScanResultType.NextPage: - this.emit('nextPage', result, this); - break; default: this.emit('message', result); break; @@ -101,7 +98,6 @@ export class Parallel extends EventEmitter { public execute(data: WorkerData): void { const task = new WorkerTask(data); task.on('message', this.taskOnMessage); - task.on('nextPage', this.taskOnNextPage); task.on('end', this.taskOnEnd); task.on('error', this.taskOnError); @@ -134,10 +130,6 @@ export class Parallel extends EventEmitter { this.emit('message', result); }; - private taskOnNextPage = (result: ScanResult, task: WorkerTask) => { - this.emit('nextPage', result, task.worker); - }; - private taskOnEnd = (result: ScanResult, task: WorkerTask) => { this.emit('end', result); diff --git a/projects/aas-server/src/app/aas-provider/scan-result.ts b/projects/aas-server/src/app/aas-provider/scan-result.ts index 495fb64..11a8182 100644 --- a/projects/aas-server/src/app/aas-provider/scan-result.ts +++ b/projects/aas-server/src/app/aas-provider/scan-result.ts @@ -9,11 +9,9 @@ import { AASDocument, Message, TemplateDescriptor, AASEndpoint } from 'aas-core'; export enum ScanResultType { - Added, - Removed, - Changed, + Add, + Remove, Update, - NextPage, End, } diff --git a/projects/aas-server/src/app/aas-scan-worker.ts b/projects/aas-server/src/app/aas-scan-worker.ts index 4b70cb5..91807ce 100644 --- a/projects/aas-server/src/app/aas-scan-worker.ts +++ b/projects/aas-server/src/app/aas-scan-worker.ts @@ -10,8 +10,10 @@ import 'reflect-metadata'; import { MemoryLogger, MemoryLoggerLevel } from './logging/memory-logger.js'; import { container } from 'tsyringe'; import { WorkerApp } from './worker-app.js'; +import { AASIndexFactory } from './aas-index/aas-index-factory.js'; container.register('Logger', MemoryLogger); +container.register('AASIndex', { useFactory: c => new AASIndexFactory(c).create() }); container.registerInstance( 'LOG_LEVEL', process.env.NODE_ENV === 'production' ? MemoryLoggerLevel.Error : MemoryLoggerLevel.All, diff --git a/projects/aas-server/src/app/endpoint-scan.ts b/projects/aas-server/src/app/endpoint-scan.ts index 0e763d0..611582c 100644 --- a/projects/aas-server/src/app/endpoint-scan.ts +++ b/projects/aas-server/src/app/endpoint-scan.ts @@ -15,6 +15,7 @@ import { ScanEndpointResult, ScanResultType } from './aas-provider/scan-result.j import { toUint8Array } from './convert.js'; import { AASResourceScanFactory } from './aas-provider/aas-resource-scan-factory.js'; import { Variable } from './variable.js'; +import { AASIndex } from './aas-index/aas-index.js'; @singleton() export class EndpointScan { @@ -22,6 +23,7 @@ export class EndpointScan { public constructor( @inject('Logger') private readonly logger: Logger, + @inject('AASIndex') private readonly index: AASIndex, @inject(AASResourceScanFactory) private readonly resourceScanFactory: AASResourceScanFactory, @inject(Variable) private readonly variable: Variable, ) {} @@ -31,21 +33,21 @@ export class EndpointScan { const scan = this.resourceScanFactory.create(data.endpoint); try { scan.on('compare', this.compare); - scan.on('removed', this.removed); + scan.on('remove', this.postRemove); + scan.on('add', this.postAdd); scan.on('error', this.onError); - const result = await scan.scanAsync(data); - // this.computeDeleted(result.result); - return result.paging_metadata.cursor; + await scan.scanAsync(this.index, data.endpoint); } finally { scan.off('compare', this.compare); - scan.off('removed', this.removed); + scan.off('remove', this.postRemove); + scan.off('add', this.postAdd); scan.off('error', this.onError); } } private compare = (reference: AASDocument, document: AASDocument): void => { if (this.documentChanged(document, reference)) { - this.postChanged(document); + this.postUpdate(document); } }; @@ -53,10 +55,10 @@ export class EndpointScan { this.logger.error(error); }; - private postChanged(document: AASDocument): void { + private postUpdate(document: AASDocument): void { const value: ScanEndpointResult = { taskId: this.data.taskId, - type: ScanResultType.Changed, + type: ScanResultType.Update, endpoint: this.data.endpoint, document: document, }; @@ -65,10 +67,10 @@ export class EndpointScan { parentPort?.postMessage(array, [array.buffer]); } - private removed = (document: AASDocument): void => { + private postRemove = (document: AASDocument): void => { const value: ScanEndpointResult = { taskId: this.data.taskId, - type: ScanResultType.Removed, + type: ScanResultType.Remove, endpoint: this.data.endpoint, document: document, }; @@ -77,17 +79,17 @@ export class EndpointScan { parentPort?.postMessage(array, [array.buffer]); }; - private postAdded(document: AASDocument): void { + private postAdd = (document: AASDocument): void => { const value: ScanEndpointResult = { taskId: this.data.taskId, - type: ScanResultType.Added, + type: ScanResultType.Add, endpoint: this.data.endpoint, document: document, }; const array = toUint8Array(value); parentPort?.postMessage(array, [array.buffer]); - } + }; private documentChanged(document: AASDocument, reference: AASDocument): boolean { if (