Skip to content

Commit

Permalink
Merge pull request #3132 from dbeaver/CB-6000-handle-task-info-events
Browse files Browse the repository at this point in the history
CB-6000 handle task info events from WS
  • Loading branch information
SychevAndrey authored Dec 13, 2024
2 parents 40f6783 + f457166 commit acd3fe2
Show file tree
Hide file tree
Showing 24 changed files with 254 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import { computed, makeObservable, observable } from 'mobx';

import type { ITask, TaskScheduler } from '@cloudbeaver/core-executor';
import type { AsyncTaskInfo, AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk';
import { AsyncTaskInfoService } from '@cloudbeaver/core-root';
import type { AsyncTaskInfo, GraphQLService } from '@cloudbeaver/core-sdk';

import type { ConnectionExecutionContextResource, IConnectionExecutionContextInfo } from './ConnectionExecutionContextResource.js';
import type { IConnectionExecutionContext } from './IConnectionExecutionContext.js';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import { injectable } from '@cloudbeaver/core-di';
import { TaskScheduler } from '@cloudbeaver/core-executor';
import { CachedMapAllKey, ResourceKeyUtils } from '@cloudbeaver/core-resource';
import { AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk';
import { AsyncTaskInfoService } from '@cloudbeaver/core-root';
import { GraphQLService } from '@cloudbeaver/core-sdk';
import { MetadataMap } from '@cloudbeaver/core-utils';

import type { IConnectionInfoParams } from '../CONNECTION_INFO_PARAM_SCHEMA.js';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@
import { computed, makeObservable, observable } from 'mobx';

import { type ISyncExecutor, SyncExecutor } from '@cloudbeaver/core-executor';
import { type AsyncTaskInfo, ServerInternalError, type WsAsyncTaskInfo } from '@cloudbeaver/core-sdk';
import { uuid } from '@cloudbeaver/core-utils';

import type { AsyncTaskInfo } from '../sdk.js';
import { ServerInternalError } from '../ServerInternalError.js';

export class AsyncTask {
readonly id: string;

get cancelled(): boolean {
return this._cancelled;
}
Expand All @@ -32,20 +28,25 @@ export class AsyncTask {
return this.innerPromise;
}

get id(): string {
return this._id;
}

readonly onStatusChange: ISyncExecutor<AsyncTaskInfo>;

private _id: string;
private _cancelled: boolean;
private taskInfo: AsyncTaskInfo | null;
private resolve!: (value: AsyncTaskInfo) => void;
private reject!: (reason?: any) => void;
private readonly innerPromise: Promise<AsyncTaskInfo>;
private updatingAsync: boolean;
private readonly init: () => Promise<AsyncTaskInfo>;
private readonly cancel: (info: AsyncTaskInfo) => Promise<void>;
private readonly cancel: (id: string) => Promise<void>;
private initPromise: Promise<void> | null;

constructor(init: () => Promise<AsyncTaskInfo>, cancel: (info: AsyncTaskInfo) => Promise<void>) {
this.id = uuid();
constructor(init: () => Promise<AsyncTaskInfo>, cancel: (id: string) => Promise<void>) {
this._id = uuid();
this.init = init;
this.cancel = cancel;
this._cancelled = false;
Expand Down Expand Up @@ -119,6 +120,13 @@ export class AsyncTask {
}
}

public updateStatus(info: WsAsyncTaskInfo): void {
if (this.taskInfo) {
this.taskInfo.status = info.statusName;
this.onStatusChange.execute(this.taskInfo);
}
}

private updateInfo(info: AsyncTaskInfo): void {
this.taskInfo = info;

Expand All @@ -134,7 +142,7 @@ export class AsyncTask {

private async cancelTask(): Promise<void> {
if (this.info) {
await this.cancel(this.info);
await this.cancel(this.info.id);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* CloudBeaver - Cloud Database Manager
* Copyright (C) 2020-2024 DBeaver Corp and others
*
* Licensed under the Apache License, Version 2.0.
* you may not use this file except in compliance with the License.
*/
import { injectable } from '@cloudbeaver/core-di';
import type { WsAsyncTaskInfo } from '@cloudbeaver/core-sdk';

import { TopicEventHandler } from '../ServerEventEmitter/TopicEventHandler.js';
import { type ISessionEvent, type SessionEventId, SessionEventSource, SessionEventTopic } from '../SessionEventSource.js';

@injectable()
export class AsyncTaskInfoEventHandler extends TopicEventHandler<WsAsyncTaskInfo, ISessionEvent, SessionEventId, SessionEventTopic> {
constructor(sessionEventSource: SessionEventSource) {
super(SessionEventTopic.CbSessionTask, sessionEventSource);
}

map(event: any): WsAsyncTaskInfo {
return event;
}
}
157 changes: 157 additions & 0 deletions webapp/packages/core-root/src/AsyncTask/AsyncTaskInfoService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* CloudBeaver - Cloud Database Manager
* Copyright (C) 2020-2024 DBeaver Corp and others
*
* Licensed under the Apache License, Version 2.0.
* you may not use this file except in compliance with the License.
*/
import type { Subscription } from 'rxjs';

import { Disposable, injectable } from '@cloudbeaver/core-di';
import { type AsyncTaskInfo, GraphQLService, type WsAsyncTaskInfo } from '@cloudbeaver/core-sdk';

import type { Unsubscribe } from '../ServerEventEmitter/IServerEventEmitter.js';
import { ServerEventId } from '../SessionEventSource.js';
import { AsyncTask } from './AsyncTask.js';
import { AsyncTaskInfoEventHandler } from './AsyncTaskInfoEventHandler.js';

@injectable()
export class AsyncTaskInfoService extends Disposable {
private readonly tasks: Map<string, AsyncTask>;
private readonly taskIdAliases: Map<string, string>;
private readonly pendingEvents: Map<string, WsAsyncTaskInfo>;
private connection: Subscription | null;
private onEventUnsubscribe: Unsubscribe | null;

constructor(
private readonly graphQLService: GraphQLService,
private readonly asyncTaskInfoEventHandler: AsyncTaskInfoEventHandler,
) {
super();
this.tasks = new Map();
this.taskIdAliases = new Map();
this.pendingEvents = new Map();
this.connection = null;
this.handleEvent = this.handleEvent.bind(this);

this.onEventUnsubscribe = asyncTaskInfoEventHandler.onEvent<WsAsyncTaskInfo>(ServerEventId.CbSessionTaskInfoUpdated, this.handleEvent);
}

private async updateTask(task: AsyncTask, data: WsAsyncTaskInfo) {
if (data.running === false) {
await task.updateInfoAsync(async () => {
const { taskInfo } = await this.graphQLService.sdk.getAsyncTaskInfo({
taskId: data.taskId,
removeOnFinish: false,
});

return taskInfo;
});
} else {
task.updateStatus(data);
}
}

private async handleEvent(data: WsAsyncTaskInfo) {
const task = this.getTask(data.taskId);

if (!task) {
this.pendingEvents.set(data.taskId, data);
return;
}

await this.updateTask(task, data);
}

override dispose(): void {
this.connection?.unsubscribe();
this.onEventUnsubscribe?.();
}

create(getter: () => Promise<AsyncTaskInfo>): AsyncTask {
const task = new AsyncTask(getter, this.cancelTask.bind(this));

this.tasks.set(task.id, task);
task.onStatusChange.addHandler(info => {
if (this.taskIdAliases.get(info.id)) {
return;
}

this.taskIdAliases.set(info.id, task.id);

const pendingEvent = this.pendingEvents.get(info.id);
if (pendingEvent) {
this.pendingEvents.delete(info.id);
this.updateTask(task, pendingEvent);
}
});

if (this.tasks.size === 1) {
this.connection = this.asyncTaskInfoEventHandler.eventsSubject.connect();
}

return task;
}

private getTask(taskId: string): AsyncTask | undefined {
let task = this.tasks.get(taskId);

if (!task) {
const internalId = this.taskIdAliases.get(taskId);

if (internalId) {
task = this.tasks.get(internalId);
}
}

return task;
}

async run(task: AsyncTask): Promise<AsyncTaskInfo> {
if (task.info === null) {
await task.run();
}

return task.promise;
}

async remove(taskId: string): Promise<void> {
const task = this.getTask(taskId);

if (!task) {
return;
}

if (task.pending) {
throw new Error('Cant remove unfinished task');
}
this.tasks.delete(task.id);
if (task.info) {
this.taskIdAliases.delete(task.info.id);
this.pendingEvents.delete(task.info.id);
}
if (this.tasks.size === 0) {
this.connection?.unsubscribe();
this.connection = null;
}

if (task.info !== null) {
await this.graphQLService.sdk.getAsyncTaskInfo({
taskId: task.info.id,
removeOnFinish: true,
});
}
}

async cancel(taskId: string): Promise<void> {
const task = this.getTask(taskId);

await task?.cancelAsync();
}

private async cancelTask(id: string): Promise<void> {
await this.graphQLService.sdk.asyncTaskCancel({
taskId: id,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { Observable } from 'rxjs';
import type { ISyncExecutor } from '@cloudbeaver/core-executor';

export type IServerEventCallback<T> = (data: T) => any;
export type Subscription = () => void;
export type Unsubscribe = () => void;

export interface IBaseServerEvent<TID extends string = string, TTopic extends string = string> {
id: TID;
Expand All @@ -25,9 +25,9 @@ export interface IServerEventEmitter<
> {
readonly onInit: ISyncExecutor;

onEvent<T = TEvent>(id: TEventID, callback: IServerEventCallback<T>, mapTo?: (event: TEvent) => T): Subscription;
onEvent<T = TEvent>(id: TEventID, callback: IServerEventCallback<T>, mapTo?: (event: TEvent) => T): Unsubscribe;

on<T = TEvent>(callback: IServerEventCallback<T>, mapTo?: (event: TEvent) => T, filter?: (event: TEvent) => boolean): Subscription;
on<T = TEvent>(callback: IServerEventCallback<T>, mapTo?: (event: TEvent) => T, filter?: (event: TEvent) => boolean): Unsubscribe;

multiplex<T = TEvent>(topicId: TTopic, mapTo?: (event: TEvent) => T): Observable<T>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import { type ISyncExecutor, SyncExecutor } from '@cloudbeaver/core-executor';
import type { IResource } from '@cloudbeaver/core-resource';
import { compose } from '@cloudbeaver/core-utils';

import type { IBaseServerEvent, IServerEventCallback, IServerEventEmitter, Subscription } from './IServerEventEmitter.js';
import type { IBaseServerEvent, IServerEventCallback, IServerEventEmitter, Unsubscribe } from './IServerEventEmitter.js';

interface ISubscribedResourceInfo {
listeners: number;
subscription: Subscription;
disposeSubscription: Unsubscribe;
}

export abstract class TopicEventHandler<
Expand All @@ -28,7 +28,7 @@ export abstract class TopicEventHandler<
readonly onInit: ISyncExecutor;
readonly eventsSubject: Connectable<TEvent>;

private subscription: Subscription | null;
private disposeSubscription: Unsubscribe | null;
private readonly activeResources: Array<IResource<any, any, any, any, any>>;
private readonly subscribedResources: Map<IResource<any, any, any, any, any>, ISubscribedResourceInfo>;
private readonly serverSubject?: Observable<TEvent>;
Expand All @@ -41,7 +41,7 @@ export abstract class TopicEventHandler<
this.subject = new Subject();
this.activeResources = [];
this.subscribedResources = new Map();
this.subscription = null;
this.disposeSubscription = null;
this.serverSubject = this.emitter.multiplex(topic, this.map);
this.eventsSubject = connectable(merge(this.subject, this.serverSubject), {
connector: () => new Subject(),
Expand All @@ -60,7 +60,7 @@ export abstract class TopicEventHandler<
callback: IServerEventCallback<T>,
mapTo: (event: TEvent) => T = event => event as unknown as T,
resource?: IResource<any, any, any, any, any>,
): Subscription {
): Unsubscribe {
if (resource) {
this.registerResource(resource);
}
Expand All @@ -86,7 +86,7 @@ export abstract class TopicEventHandler<
mapTo: (param: TEvent) => T = event => event as unknown as T,
filterFn: (param: TEvent) => boolean = () => true,
resource?: IResource<any, any, any, any, any>,
): Subscription {
): Unsubscribe {
if (resource) {
this.registerResource(resource);
}
Expand Down Expand Up @@ -118,10 +118,10 @@ export abstract class TopicEventHandler<
if (resource.useTracker.isResourceInUse) {
this.activeResources.push(resource);

if (!this.subscription) {
if (!this.disposeSubscription) {
// console.log('Subscribe: ', resource.getName());
const sub = this.eventsSubject.connect();
this.subscription = () => sub.unsubscribe();
this.disposeSubscription = () => sub.unsubscribe();
}
}
}
Expand All @@ -132,8 +132,8 @@ export abstract class TopicEventHandler<

if (this.activeResources.length === 0) {
// console.log('Unsubscribe: ', resource.getName());
this.subscription?.();
this.subscription = null;
this.disposeSubscription?.();
this.disposeSubscription = null;
}
}

Expand All @@ -143,10 +143,10 @@ export abstract class TopicEventHandler<
if (!info) {
info = {
listeners: 0,
subscription: this.resourceUseHandler.bind(this, resource),
disposeSubscription: this.resourceUseHandler.bind(this, resource),
};
this.subscribedResources.set(resource, info);
resource.useTracker.onUse.addHandler(info.subscription);
resource.useTracker.onUse.addHandler(info.disposeSubscription);
// console.log('Register: ', resource.getName());
}

Expand All @@ -161,7 +161,7 @@ export abstract class TopicEventHandler<

if (info.listeners === 0) {
this.removeActiveResource(resource);
resource.useTracker.onUse.removeHandler(info.subscription);
resource.useTracker.onUse.removeHandler(info.disposeSubscription);
this.subscribedResources.delete(resource);
// console.log('Unregister: ', resource.getName());
}
Expand Down
Loading

0 comments on commit acd3fe2

Please sign in to comment.