Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: transport Exceptions from tasks over our websocket connection #300

Merged
merged 9 commits into from
Oct 25, 2023
16 changes: 6 additions & 10 deletions renumics/spotlight/backend/tasks/reduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import numpy as np
import pandas as pd

from renumics.spotlight.dataset.exceptions import ColumnNotExistsError
from renumics.spotlight.data_store import DataStore
from renumics.spotlight import dtypes

Expand Down Expand Up @@ -80,10 +79,8 @@ def compute_umap(
Prepare data from table and compute U-Map on them.
"""

try:
data, indices = align_data(data_store, column_names, indices)
except (ColumnNotExistsError, ColumnNotEmbeddable):
return np.empty(0, np.float64), []
data, indices = align_data(data_store, column_names, indices)

if data.size == 0:
return np.empty(0, np.float64), []

Expand Down Expand Up @@ -116,14 +113,13 @@ def compute_pca(
Prepare data from table and compute PCA on them.
"""

from sklearn import preprocessing, decomposition
data, indices = align_data(data_store, column_names, indices)

try:
data, indices = align_data(data_store, column_names, indices)
except (ColumnNotExistsError, ColumnNotEmbeddable):
return np.empty(0, np.float64), []
if data.size == 0:
return np.empty(0, np.float64), []

from sklearn import preprocessing, decomposition

if data.shape[1] == 1:
return np.hstack((data, np.zeros_like(data))), indices
if normalization == "standardize":
Expand Down
11 changes: 8 additions & 3 deletions renumics/spotlight/backend/tasks/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import multiprocessing
from concurrent.futures import Future, ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from typing import Any, Callable, List, Optional, Sequence, TypeVar, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, TypeVar, Union

from .exceptions import TaskCancelled
from .task import Task
Expand All @@ -30,16 +30,20 @@ def create_task(
self,
func: Callable,
args: Sequence[Any],
kwargs: Optional[Dict[str, Any]] = None,
name: Optional[str] = None,
tag: Optional[Union[str, int]] = None,
) -> Task:
"""
create and launch a new task
"""
if kwargs is None:
kwargs = {}

# cancel running task with same name
self.cancel(name=name)

future = self.pool.submit(func, *args)
future = self.pool.submit(func, *args, **kwargs)

task = Task(name, tag, future)
self.tasks.append(task)
Expand All @@ -59,14 +63,15 @@ async def run_async(
self,
func: Callable[..., T],
args: Sequence[Any],
kwargs: Optional[Dict[str, Any]] = None,
name: Optional[str] = None,
tag: Optional[Union[str, int]] = None,
) -> T:
"""
Launch a new task. Await and return result.
"""

task = self.create_task(func, args, name, tag)
task = self.create_task(func, args=args, kwargs=kwargs, name=name, tag=tag)
try:
return await asyncio.wrap_future(task.future)
except BrokenProcessPool as e:
Expand Down
Loading
Loading