Skip to content

Commit

Permalink
feat: handle errors during message serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
neindochoh committed Oct 25, 2023
1 parent 6fd6f8c commit a1d7cb9
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 23 deletions.
5 changes: 1 addition & 4 deletions renumics/spotlight/backend/tasks/reduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,7 @@ def compute_pca(

from sklearn import preprocessing, decomposition

try:
data, indices = align_data(data_store, column_names, indices)
except (ColumnNotExistsError, ValueError):
return np.empty(0, np.float64), []
data, indices = align_data(data_store, column_names, indices)
if data.size == 0:
return np.empty(0, np.float64), []
if data.shape[1] == 1:
Expand Down
27 changes: 21 additions & 6 deletions renumics/spotlight/backend/websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,25 @@ async def send_async(self, message: Message) -> None:
"""
Send a message async.
"""

try:
message_data = message.dict()
await self.websocket.send_text(
orjson.dumps(message_data, option=orjson.OPT_SERIALIZE_NUMPY).decode()
json_text = orjson.dumps(
message.dict(), option=orjson.OPT_SERIALIZE_NUMPY
).decode()
except TypeError as e:
error_message = Message(
type="error",
data={
"type": type(e).__name__,
"title": "Failed to serialize message",
"detail": str(e),
},
)
json_text = orjson.dumps(
error_message.dict(), option=orjson.OPT_SERIALIZE_NUMPY
).decode()
try:
await self.websocket.send_text(json_text)
except WebSocketDisconnect:
self._on_disconnect()
except RuntimeError:
Expand Down Expand Up @@ -256,7 +270,6 @@ async def _(data: TaskData, connection: WebsocketConnection) -> None:

try:
task_func = TASK_FUNCS[data.task]
print(data.args)
result = await connection.task_manager.run_async(
task_func, # type: ignore
args=(data_store,),
Expand All @@ -267,6 +280,7 @@ async def _(data: TaskData, connection: WebsocketConnection) -> None:
points = cast(np.ndarray, result[0])
valid_indices = cast(np.ndarray, result[1])
except TaskCancelled:
print("task cancelled")
pass
except Problem as e:
msg = Message(
Expand All @@ -275,13 +289,14 @@ async def _(data: TaskData, connection: WebsocketConnection) -> None:
"task_id": data.task_id,
"error": {
"type": type(e).__name__,
"title": type(e).__name__,
"detail": type(e).__doc__,
"title": e.title,
"detail": e.detail,
},
},
)
await connection.send_async(msg)
except Exception as e:
print("task failed")
msg = Message(
type="task.error",
data={
Expand Down
23 changes: 17 additions & 6 deletions src/services/task.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { useDataset } from '../lib';
import websocketService, { WebsocketService } from './websocket';
import { Message } from './websocket/types';

interface ResponseHandler {
resolve: (result: unknown) => void;
Expand All @@ -16,12 +17,22 @@ class TaskService {
constructor(websocketService: WebsocketService) {
this.dispatchTable = new Map();
this.websocketService = websocketService;
this.websocketService.registerMessageHandler('task.result', (message: any) => {
this.dispatchTable.get(message.data.task_id)?.resolve(message.data.result);
});
this.websocketService.registerMessageHandler('task.error', (message: any) => {
this.dispatchTable.get(message.data.task_id)?.reject(message.data.error);
});
this.websocketService.registerMessageHandler(
'task.result',
(message: Message) => {
this.dispatchTable
.get(message.data.task_id)
?.resolve(message.data.result);
}
);
this.websocketService.registerMessageHandler(
'task.error',
(message: Message) => {
this.dispatchTable
.get(message.data.task_id)
?.reject(message.data.error);
}
);
}

async run(task: string, name: string, args: unknown) {
Expand Down
13 changes: 6 additions & 7 deletions src/services/websocket/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { notifyProblem } from '../../notify';
import Connection from './connection';
import { Message, MessageHandler } from './types';

Expand Down Expand Up @@ -35,11 +36,9 @@ const websocketService = new WebsocketService(
globalThis.location.hostname,
globalThis.location.port
);
export default websocketService;

// TODO: these functions should probably be somewhere else
/*
} else if (message.type === 'resetLayout') {
useLayout.getState().reset();
}
*/
websocketService.registerMessageHandler('error', (message: Message) => {
notifyProblem(message.data);
});

export default websocketService;

0 comments on commit a1d7cb9

Please sign in to comment.