Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CB-3508 create events for session async tasks #3036

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9cb7433
CB-3508 create events for session async tasks
yagudin10 Oct 31, 2024
f2e3406
CB-3508 remove redundant events
yagudin10 Nov 4, 2024
51ebebb
CB-3508 update async web task event logic
yagudin10 Dec 9, 2024
faaf410
CB-6000 refactor: migrate AsyncTask code to core-root package
SychevAndrey Dec 10, 2024
6ca14e4
Merge branch 'devel' into CB-6000-handle-task-info-events
SychevAndrey Dec 10, 2024
5f1e154
Merge branch 'devel' into CB-3508-te-initiate-events-for-execute-quer…
SychevAndrey Dec 10, 2024
446f817
Merge branch 'CB-3508-te-initiate-events-for-execute-query-and-all-as…
SychevAndrey Dec 10, 2024
867a789
CB-6000: fix
SychevAndrey Dec 10, 2024
f5cf25d
CB-3508 use only update async task event
yagudin10 Dec 11, 2024
e30172a
CB-3508 add result info to async task event
yagudin10 Dec 11, 2024
feb8df1
CB-6000 feat: change the way of updating task state
SychevAndrey Dec 11, 2024
e6440d2
Merge branch 'CB-3508-te-initiate-events-for-execute-query-and-all-as…
SychevAndrey Dec 11, 2024
d14e546
CB-6000 refactor: rename type and property
SychevAndrey Dec 11, 2024
73e848b
CB-6000 fix: use id from server
SychevAndrey Dec 11, 2024
662a317
CB-3508 do not have results in task info event
yagudin10 Dec 11, 2024
a96d91e
Merge branch 'CB-3508-te-initiate-events-for-execute-query-and-all-as…
SychevAndrey Dec 11, 2024
25c9757
CB-6000 changes after BE update
SychevAndrey Dec 11, 2024
4b8faa7
CB-6000 fix: set only status on status update
SychevAndrey Dec 12, 2024
04603ee
CB-3508 fixes after review
yagudin10 Dec 12, 2024
40f6783
Merge branch 'devel' into CB-3508-te-initiate-events-for-execute-quer…
yagudin10 Dec 12, 2024
012b0d8
CB-6000 refactor: task id
SychevAndrey Dec 12, 2024
f9f4c27
CB-6000 refactor: handle race condition
SychevAndrey Dec 12, 2024
2a956ac
Merge branch 'CB-3508-te-initiate-events-for-execute-query-and-all-as…
SychevAndrey Dec 12, 2024
f457166
CB-6000 refactor: AsyncTask creation
SychevAndrey Dec 13, 2024
acd3fe2
Merge pull request #3132 from dbeaver/CB-6000-handle-task-info-events
SychevAndrey Dec 13, 2024
7ce5380
Merge branch 'devel' into CB-3508-te-initiate-events-for-execute-quer…
mr-anton-t Dec 13, 2024
b32962a
CB-3508 fixes build
sergeyteleshev Dec 16, 2024
2bc9e6f
Merge branch 'devel' into CB-3508-te-initiate-events-for-execute-quer…
mr-anton-t Dec 16, 2024
5a285a7
CB-6000 fix: retry update task info later if locked
SychevAndrey Dec 17, 2024
add3d27
Merge branch 'devel' into CB-3508-te-initiate-events-for-execute-quer…
sergeyteleshev Dec 17, 2024
d2676c0
Merge branch 'CB-3508-te-initiate-events-for-execute-query-and-all-as…
SychevAndrey Dec 17, 2024
40f96e9
CB-6000 refactor: add a comment about delayed update
SychevAndrey Dec 17, 2024
da3c181
Merge branch 'devel' into CB-3508-te-initiate-events-for-execute-quer…
mr-anton-t Dec 17, 2024
3deb08b
Merge branch 'devel' into CB-3508-te-initiate-events-for-execute-quer…
mr-anton-t Dec 17, 2024
75c92b6
Merge branch 'devel' into CB-3508-te-initiate-events-for-execute-quer…
dariamarutkina Dec 20, 2024
f5b0a93
Merge branch 'devel' into CB-3508-te-initiate-events-for-execute-quer…
dariamarutkina Dec 23, 2024
adc1a9e
Merge branch 'devel' into CB-3508-te-initiate-events-for-execute-quer…
yagudin10 Dec 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,41 @@
*/
package io.cloudbeaver.model;

import org.jkiss.code.NotNull;
import org.jkiss.dbeaver.model.runtime.AbstractJob;

/**
* Web connection info
* Web async task info
*/
public class WebAsyncTaskInfo {

private String id;
private String name;
private boolean running;
@NotNull
private final String id;
@NotNull
private final String name;
private boolean running = false;
private Object result;
private Object extendedResult;
private String status;
private Throwable jobError;

private AbstractJob job;

public WebAsyncTaskInfo(String id, String name) {
public WebAsyncTaskInfo(@NotNull String id, @NotNull String name) {
this.id = id;
this.name = name;
}

@NotNull
public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

@NotNull
public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public boolean isRunning() {
return running;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import io.cloudbeaver.model.WebServerMessage;
import io.cloudbeaver.model.app.ServletApplication;
import io.cloudbeaver.model.app.ServletAuthApplication;
import io.cloudbeaver.model.session.monitor.TaskProgressMonitor;
import io.cloudbeaver.model.user.WebUser;
import io.cloudbeaver.service.DBWSessionHandler;
import io.cloudbeaver.service.sql.WebSQLConstants;
import io.cloudbeaver.utils.CBModelConstants;
import io.cloudbeaver.utils.WebDataSourceUtils;
import io.cloudbeaver.utils.WebEventUtils;
import org.eclipse.core.runtime.IAdaptable;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
Expand All @@ -57,7 +59,6 @@
import org.jkiss.dbeaver.model.runtime.AbstractJob;
import org.jkiss.dbeaver.model.runtime.BaseProgressMonitor;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.model.runtime.ProxyProgressMonitor;
import org.jkiss.dbeaver.model.security.SMAdminController;
import org.jkiss.dbeaver.model.security.SMConstants;
import org.jkiss.dbeaver.model.security.SMController;
Expand Down Expand Up @@ -511,7 +512,7 @@
///////////////////////////////////////////////////////
// Async model

public WebAsyncTaskInfo getAsyncTask(String taskId, String taskName, boolean create) {
public WebAsyncTaskInfo getAsyncTask(@NotNull String taskId, @NotNull String taskName, boolean create) {

Check warning on line 515 in server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/session/WebSession.java

View workflow job for this annotation

GitHub Actions / Server / Lint

[checkstyle] reported by reviewdog 🐶 Missing a Javadoc comment. Raw Output: /github/workspace/./server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/session/WebSession.java:515:5: warning: Missing a Javadoc comment. (com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocMethodCheck)
synchronized (asyncTasks) {
WebAsyncTaskInfo taskInfo = asyncTasks.get(taskId);
if (taskInfo == null && create) {
Expand All @@ -528,7 +529,6 @@
if (taskInfo == null) {
throw new DBWebException("Task '" + taskId + "' not found");
}
taskInfo.setRunning(taskInfo.getJob() != null && !taskInfo.getJob().isFinished());
if (removeOnFinish && !taskInfo.isRunning()) {
asyncTasks.remove(taskId);
}
Expand All @@ -551,7 +551,7 @@
return true;
}

public WebAsyncTaskInfo createAndRunAsyncTask(String taskName, WebAsyncTaskProcessor<?> runnable) {
public WebAsyncTaskInfo createAndRunAsyncTask(@NotNull String taskName, @NotNull WebAsyncTaskProcessor<?> runnable) {

Check warning on line 554 in server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/session/WebSession.java

View workflow job for this annotation

GitHub Actions / Server / Lint

[checkstyle] reported by reviewdog 🐶 Missing a Javadoc comment. Raw Output: /github/workspace/./server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/session/WebSession.java:554:5: warning: Missing a Javadoc comment. (com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocMethodCheck)
int taskId = TASK_ID.incrementAndGet();
WebAsyncTaskInfo asyncTask = getAsyncTask(String.valueOf(taskId), taskName, true);

Expand All @@ -560,7 +560,8 @@
protected IStatus run(DBRProgressMonitor monitor) {
int curTaskCount = taskCount.incrementAndGet();

TaskProgressMonitor taskMonitor = new TaskProgressMonitor(monitor, asyncTask);
DBRProgressMonitor taskMonitor = new TaskProgressMonitor(monitor, WebSession.this, asyncTask);

try {
Number queryLimit = application.getAppConfiguration().getResourceQuota(WebSQLConstants.QUOTA_PROP_QUERY_LIMIT);
if (queryLimit != null && curTaskCount > queryLimit.intValue()) {
Expand All @@ -572,14 +573,15 @@
asyncTask.setResult(runnable.getResult());
asyncTask.setExtendedResult(runnable.getExtendedResults());
asyncTask.setStatus("Finished");
asyncTask.setRunning(false);
} catch (InvocationTargetException e) {
addSessionError(e.getTargetException());
asyncTask.setJobError(e.getTargetException());
} catch (Exception e) {
asyncTask.setJobError(e);
} finally {
taskCount.decrementAndGet();
asyncTask.setRunning(false);
WebEventUtils.sendAsyncTaskEvent(WebSession.this, asyncTask);
}
return Status.OK_STATUS;
}
Expand Down Expand Up @@ -989,27 +991,6 @@
}
}

private static class TaskProgressMonitor extends ProxyProgressMonitor {

private final WebAsyncTaskInfo asyncTask;

public TaskProgressMonitor(DBRProgressMonitor original, WebAsyncTaskInfo asyncTask) {
super(original);
this.asyncTask = asyncTask;
}

@Override
public void beginTask(String name, int totalWork) {
super.beginTask(name, totalWork);
asyncTask.setStatus(name);
}

@Override
public void subTask(String name) {
super.subTask(name);
asyncTask.setStatus(name);
}
}

private record PersistentAttribute(Object value) {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* DBeaver - Universal Database Manager
* Copyright (C) 2010-2024 DBeaver Corp and others
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudbeaver.model.session.monitor;

import io.cloudbeaver.model.WebAsyncTaskInfo;
import io.cloudbeaver.model.session.WebSession;
import io.cloudbeaver.utils.WebEventUtils;
import org.jkiss.code.NotNull;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.model.runtime.ProxyProgressMonitor;

/**
* Task progress monitor.
* Used by async GQL requests.
*/
public class TaskProgressMonitor extends ProxyProgressMonitor {

@NotNull
private final WebAsyncTaskInfo asyncTask;
private final WebSession webSession;

public TaskProgressMonitor(DBRProgressMonitor original, @NotNull WebSession webSession, @NotNull WebAsyncTaskInfo asyncTask) {
super(original);
this.webSession = webSession;
this.asyncTask = asyncTask;
}

@Override
public void beginTask(String name, int totalWork) {
super.beginTask(name, totalWork);
asyncTask.setStatus(name);
WebEventUtils.sendAsyncTaskEvent(webSession, asyncTask);
}

@Override
public void subTask(String name) {
super.subTask(name);
asyncTask.setStatus(name);
WebEventUtils.sendAsyncTaskEvent(webSession, asyncTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package io.cloudbeaver.utils;

import io.cloudbeaver.model.WebAsyncTaskInfo;
import io.cloudbeaver.model.session.WebSession;
import org.jkiss.code.NotNull;
import org.jkiss.dbeaver.model.app.DBPProject;
import org.jkiss.dbeaver.model.websocket.WSConstants;
import org.jkiss.dbeaver.model.websocket.event.WSEvent;
Expand All @@ -25,6 +27,7 @@
import org.jkiss.dbeaver.model.websocket.event.datasource.WSDatasourceFolderEvent;
import org.jkiss.dbeaver.model.websocket.event.resource.WSResourceProperty;
import org.jkiss.dbeaver.model.websocket.event.resource.WSResourceUpdatedEvent;
import org.jkiss.dbeaver.model.websocket.event.session.WSSessionTaskInfoEvent;

import java.util.List;

Expand Down Expand Up @@ -182,4 +185,14 @@
ServletAppUtils.getServletApplication().getEventController().addEvent(event);
}

public static void sendAsyncTaskEvent(@NotNull WebSession webSession, @NotNull WebAsyncTaskInfo taskInfo) {

Check warning on line 188 in server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/utils/WebEventUtils.java

View workflow job for this annotation

GitHub Actions / Server / Lint

[checkstyle] reported by reviewdog 🐶 Missing a Javadoc comment. Raw Output: /github/workspace/./server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/utils/WebEventUtils.java:188:5: warning: Missing a Javadoc comment. (com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocMethodCheck)
webSession.addSessionEvent(
new WSSessionTaskInfoEvent(
taskInfo.getId(),
taskInfo.getStatus(),
taskInfo.isRunning()
)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ enum CBServerEventId {
cb_object_permissions_updated,
cb_subject_permissions_updated,

cb_database_output_log_updated
cb_database_output_log_updated,

cb_session_task_info_updated @since(version: "24.3.1")
}

# Events sent by client
Expand All @@ -56,6 +58,9 @@ enum CBEventTopic {
cb_object_permissions,
cb_subject_permissions,
cb_database_output_log,

cb_session_task, @since(version: "24.3.1")

cb_datasource_connection,
cb_delete_temp_folder
}
Expand Down Expand Up @@ -182,6 +187,13 @@ type WSOutputLogInfo {
# Add more fields as needed
}

# Async task info status event
type WSAsyncTaskInfo @since(version: "24.3.1") {
id: CBServerEventId!
taskId: ID!
statusName: String
running: Boolean!

# Datasource disconnect event
type WSDataSourceDisconnectEvent implements CBServerEvent {
id: CBServerEventId!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
import org.jkiss.dbeaver.model.data.DBDDocument;
import org.jkiss.dbeaver.model.exec.DBCException;
import org.jkiss.dbeaver.model.meta.Property;
import org.jkiss.utils.Pair;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Web SQL query results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import { computed, makeObservable, observable } from 'mobx';

import type { ITask, TaskScheduler } from '@cloudbeaver/core-executor';
import type { AsyncTaskInfo, AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk';
import { AsyncTaskInfoService } from '@cloudbeaver/core-root';
import type { AsyncTaskInfo, GraphQLService } from '@cloudbeaver/core-sdk';

import type { ConnectionExecutionContextResource, IConnectionExecutionContextInfo } from './ConnectionExecutionContextResource.js';
import type { IConnectionExecutionContext } from './IConnectionExecutionContext.js';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import { injectable } from '@cloudbeaver/core-di';
import { TaskScheduler } from '@cloudbeaver/core-executor';
import { CachedMapAllKey, ResourceKeyUtils } from '@cloudbeaver/core-resource';
import { AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk';
import { AsyncTaskInfoService } from '@cloudbeaver/core-root';
import { GraphQLService } from '@cloudbeaver/core-sdk';
import { MetadataMap } from '@cloudbeaver/core-utils';

import type { IConnectionInfoParams } from '../CONNECTION_INFO_PARAM_SCHEMA.js';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@
import { computed, makeObservable, observable } from 'mobx';

import { type ISyncExecutor, SyncExecutor } from '@cloudbeaver/core-executor';
import { type AsyncTaskInfo, ServerInternalError, type WsAsyncTaskInfo } from '@cloudbeaver/core-sdk';
import { uuid } from '@cloudbeaver/core-utils';

import type { AsyncTaskInfo } from '../sdk.js';
import { ServerInternalError } from '../ServerInternalError.js';

export class AsyncTask {
readonly id: string;

get cancelled(): boolean {
return this._cancelled;
}
Expand All @@ -32,20 +28,25 @@ export class AsyncTask {
return this.innerPromise;
}

get id(): string {
return this._id;
}

readonly onStatusChange: ISyncExecutor<AsyncTaskInfo>;

private _id: string;
private _cancelled: boolean;
private taskInfo: AsyncTaskInfo | null;
private resolve!: (value: AsyncTaskInfo) => void;
private reject!: (reason?: any) => void;
private readonly innerPromise: Promise<AsyncTaskInfo>;
private updatingAsync: boolean;
private readonly init: () => Promise<AsyncTaskInfo>;
private readonly cancel: (info: AsyncTaskInfo) => Promise<void>;
private readonly cancel: (id: string) => Promise<void>;
private initPromise: Promise<void> | null;

constructor(init: () => Promise<AsyncTaskInfo>, cancel: (info: AsyncTaskInfo) => Promise<void>) {
this.id = uuid();
constructor(init: () => Promise<AsyncTaskInfo>, cancel: (id: string) => Promise<void>) {
this._id = uuid();
this.init = init;
this.cancel = cancel;
this._cancelled = false;
Expand Down Expand Up @@ -119,6 +120,13 @@ export class AsyncTask {
}
}

public updateStatus(info: WsAsyncTaskInfo): void {
if (this.taskInfo) {
this.taskInfo.status = info.statusName;
this.onStatusChange.execute(this.taskInfo);
}
}

private updateInfo(info: AsyncTaskInfo): void {
this.taskInfo = info;

Expand All @@ -134,7 +142,7 @@ export class AsyncTask {

private async cancelTask(): Promise<void> {
if (this.info) {
await this.cancel(this.info);
await this.cancel(this.info.id);
}
}
}
Loading
Loading