Skip to content

Commit

Permalink
in progress...
Browse files Browse the repository at this point in the history
  • Loading branch information
raronpxcsw committed Oct 23, 2024
1 parent 98c11f3 commit 6a5ccd5
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 112 deletions.
2 changes: 1 addition & 1 deletion projects/aas-core/src/lib/server-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { AASDocument, AASEndpoint } from './types.js';
export type AASServerMessageType =
| 'Added'
| 'Removed'
| 'Changed'
| 'Update'
| 'Offline'
| 'EndpointAdded'
| 'EndpointRemoved'
Expand Down
6 changes: 3 additions & 3 deletions projects/aas-lib/src/lib/index-change.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ export class IndexChangeService {
case 'Removed':
this.documentRemoved();
break;
case 'Changed':
this.documentChanged();
case 'Update':
this.documentUpdate();
break;
case 'EndpointAdded':
this.endpointAdded();
Expand All @@ -128,7 +128,7 @@ export class IndexChangeService {
this.state.update(state => ({ ...state, removedDocuments: state.removedDocuments + 1 }));
}

private documentChanged(): void {
private documentUpdate(): void {
this.state.update(state => ({ ...state, changedDocuments: state.changedDocuments + 1 }));
}

Expand Down
40 changes: 21 additions & 19 deletions projects/aas-server/src/app/aas-provider/aas-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
ApplicationError,
getChildren,
isReferenceElement,
noop,
} from 'aas-core';

import { ImageProcessing } from '../image-processing.js';
Expand Down Expand Up @@ -374,12 +375,13 @@ export class AASProvider {

/** Only used for test. */
public async scanAsync(factory: AASResourceScanFactory): Promise<void> {
for (const endpoint of await this.index.getEndpoints()) {
if (endpoint.type === 'FileSystem') {
const result = await factory.create(endpoint).scanAsync();
result.result.forEach(async document => await this.index.add(document));
}
}
noop(factory);
// for (const endpoint of await this.index.getEndpoints()) {
// if (endpoint.type === 'FileSystem') {
// const result = await factory.create(endpoint).scanAsync();
// result.result.forEach(async document => await this.index.add(document));
// }
// }
}

/**
Expand Down Expand Up @@ -460,6 +462,13 @@ export class AASProvider {
return resource.createSubscription(client, message, env);
}

private notify(data: AASServerMessage): void {
this.wsServer.notify('IndexChange', {
type: 'AASServerMessage',
data: data,
});
}

private startScan = async (): Promise<void> => {
try {
for (const endpoint of await this.index.getEndpoints()) {
Expand All @@ -470,13 +479,6 @@ export class AASProvider {
}
};

private notify(data: AASServerMessage): void {
this.wsServer.notify('IndexChange', {
type: 'AASServerMessage',
data: data,
});
}

private scanEndpoint = async (taskId: number, endpoint: AASEndpoint) => {
const data: ScanEndpointData = {
type: 'ScanEndpointData',
Expand All @@ -491,13 +493,13 @@ export class AASProvider {
private parallelOnMessage = async (result: ScanEndpointResult) => {
try {
switch (result.type) {
case ScanResultType.Changed:
await this.onChanged(result);
case ScanResultType.Update:
await this.onUpdate(result);
break;
case ScanResultType.Added:
case ScanResultType.Add:
await this.onAdded(result);
break;
case ScanResultType.Removed:
case ScanResultType.Remove:
await this.onRemoved(result);
break;
}
Expand Down Expand Up @@ -529,7 +531,7 @@ export class AASProvider {
}
};

private async onChanged(result: ScanEndpointResult): Promise<void> {
private async onUpdate(result: ScanEndpointResult): Promise<void> {
const document = result.document;
if ((await this.index.hasEndpoint(document.endpoint)) === false) {
return;
Expand All @@ -540,7 +542,7 @@ export class AASProvider {
this.cache.set(document.endpoint, document.id, document.content);
}

this.sendMessage({ type: 'Changed', document: { ...document, content: null } });
this.sendMessage({ type: 'Update', document: { ...document, content: null } });
}

private async onAdded(result: ScanEndpointResult): Promise<void> {
Expand Down
93 changes: 91 additions & 2 deletions projects/aas-server/src/app/aas-provider/aas-resource-scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,101 @@
*****************************************************************************/

import EventEmitter from 'events';
import { AASDocument, AASEndpoint } from 'aas-core';
import { AASIndex } from '../aas-index/aas-index.js';
import { PagedResult } from '../types/paged-result.js';

/** Defines an automate to scan an AAS resource for Asset Administration Shells. */
export abstract class AASResourceScan extends EventEmitter {
/**
* Gets all documents of the current container.
* @param cursor ToDo.
* @param index The AAS index.
* @param endpoint The endpoint.
* */
public abstract scanAsync(): Promise<void>;
public async scanAsync(index: AASIndex, endpoint: AASEndpoint): Promise<void> {
try {
await this.open();

const map = new Map<string, { reference?: AASDocument; document?: AASDocument }>();
let indexCursor: string | undefined;
let endpointCursor: string | undefined;
let a = true;
let b = true;
do {
if (a) {
const result = await index.nextPage(endpoint.name, indexCursor);
for (const reference of result.result) {
let value = map.get(reference.id);
if (value === undefined) {
value = { reference };
map.set(reference.id, value);
} else if (value.reference === undefined) {
value.reference = reference;
}
}

indexCursor = result.paging_metadata.cursor;
if (indexCursor === undefined) {
a = false;
}
}

if (b) {
const result = await this.nextEndpointPage(endpointCursor);
for (const id of result.result) {
let value = map.get(id);
if (value === undefined) {
value = {};
map.set(id, value);
}

if (value.document === undefined) {
value.document = await this.createDocument(id);
}
}

endpointCursor = result.paging_metadata.cursor;
if (endpointCursor === undefined) {
b = false;
}
}

const keys: string[] = [];
for (const value of map.values()) {
if (value.reference && value.document) {
keys.push(value.reference.id);
this.emit('compare', value.reference, value.document);
} else if (!a && value.document) {
keys.push(value.document.id);
this.emit('add', value.document);
} else if (!b && value.reference) {
keys.push(value.reference.id);
this.emit('remove', value.document);
}
}

keys.forEach(key => map.delete(key));
} while (a || b);

for (const value of map.values()) {
if (value.reference && value.document) {
this.emit('compare', value.reference, value.document);
} else if (!a && value.document) {
this.emit('add', value.document);
} else if (!b && value.reference) {
this.emit('remove', value.document);
}
}
} finally {
await this.close();
}
}

protected abstract open(): Promise<void>;

protected abstract close(): Promise<void>;

protected abstract createDocument(id: string): Promise<AASDocument>;

protected abstract nextEndpointPage(cursor: string | undefined): Promise<PagedResult<string>>;
}
41 changes: 14 additions & 27 deletions projects/aas-server/src/app/aas-provider/aas-server-scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,32 @@ import { Logger } from '../logging/logger.js';
import { AASApiClient } from '../packages/aas-server/aas-api-client.js';
import { AASServerPackage } from '../packages/aas-server/aas-server-package.js';
import { AASResourceScan } from './aas-resource-scan.js';
import { PagedResult } from '../types/paged-result.js';

export class AASServerScan extends AASResourceScan {
private readonly logger: Logger;
private readonly server: AASApiClient;

private static set = new Set<string>();

public constructor(logger: Logger, server: AASApiClient) {
super();

this.logger = logger;
this.server = server;
}

public async scanAsync(): Promise<void> {
try {
await this.server.openAsync();
protected override open(): Promise<void> {
return this.server.openAsync();
}
protected override close(): Promise<void> {
return this.server.closeAsync();
}

protected override createDocument(id: string): Promise<AASDocument> {
const aasPackage = new AASServerPackage(this.logger, this.server, id);
return aasPackage.createDocumentAsync();
}

const documents: AASDocument[] = [];
const result = await this.server.getShellsAsync();
const ids = new Set(result.result);
for (const id of ids) {
if (AASServerScan.set.has(id)) {
AASServerScan.set.delete(id);
} else {
AASServerScan.set.add(id);
}

try {
const aasPackage = new AASServerPackage(this.logger, this.server, id);
const document = await aasPackage.createDocumentAsync();
documents.push(document);
this.emit('scanned', document);
} catch (error) {
this.emit('error', error, this.server, id);
}
}
} finally {
await this.server.closeAsync();
}
protected override nextEndpointPage(cursor: string | undefined): Promise<PagedResult<string>> {
return this.server.getShellsAsync(cursor);
}
}
47 changes: 31 additions & 16 deletions projects/aas-server/src/app/aas-provider/directory-scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,47 @@ import { Logger } from '../logging/logger.js';
import { AasxPackage } from '../packages/file-system/aasx-package.js';
import { AasxDirectory } from '../packages/file-system/aasx-directory.js';
import { AASResourceScan } from './aas-resource-scan.js';
import { PagedResult } from '../types/paged-result.js';

export class DirectoryScan extends AASResourceScan {
private readonly map = new Map<string, AASDocument>();

public constructor(
private readonly logger: Logger,
private readonly source: AasxDirectory,
) {
super();
}

public async scanAsync(): Promise<void> {
try {
await this.source.openAsync();
const result = await this.source.getFiles(cursor);
const documents: AASDocument[] = [];
for (const file of result.result) {
try {
const aasxPackage = new AasxPackage(this.logger, this.source, file);
const document = await aasxPackage.createDocumentAsync();
documents.push(document);
this.emit('scanned', document);
} catch (error) {
this.emit('error', error, this.source, file);
}
protected override open(): Promise<void> {
this.map.clear();
return this.source.openAsync();
}

protected override close(): Promise<void> {
this.map.clear();
return this.source.closeAsync();
}

protected override createDocument(id: string): Promise<AASDocument> {
const document = this.map.get(id);
return document ? Promise.resolve(document) : Promise.reject(new Error(`${id} not found.`));
}

protected override async nextEndpointPage(cursor: string | undefined): Promise<PagedResult<string>> {
const result = await this.source.getFiles(cursor);
const ids: string[] = [];
for (const file of result.result) {
try {
const aasxPackage = new AasxPackage(this.logger, this.source, file);
const document = await aasxPackage.createDocumentAsync();
ids.push(document.id);
this.map.set(document.id, document);
} catch (error) {
this.emit('error', error, this.source, file);
}
} finally {
await this.source.closeAsync();
}

return { result: ids, paging_metadata: {} };
}
}
Loading

0 comments on commit 6a5ccd5

Please sign in to comment.