Skip to content

Commit

Permalink
refactor: rename worker.worker to worker.impl
Browse files Browse the repository at this point in the history
  • Loading branch information
MHajoha committed Aug 22, 2024
1 parent d9cba0b commit b89bc34
Show file tree
Hide file tree
Showing 17 changed files with 192 additions and 195 deletions.
4 changes: 2 additions & 2 deletions questionpy_server/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
)

from questionpy_common.constants import MAX_PACKAGE_SIZE, MiB
from questionpy_server.worker.worker import Worker
from questionpy_server.worker.worker.subprocess import SubprocessWorker
from questionpy_server.worker import Worker
from questionpy_server.worker.impl.subprocess import SubprocessWorker

REPOSITORY_MINIMUM_INTERVAL: Final[timedelta] = timedelta(minutes=5)

Expand Down
3 changes: 1 addition & 2 deletions questionpy_server/web/_routes/_attempts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
from questionpy_server.worker.runtime.package_location import ZipPackageLocation

if TYPE_CHECKING:
from questionpy_server.worker.worker import Worker

from questionpy_server.worker import Worker

attempt_routes = web.RouteTableDef()

Expand Down
2 changes: 1 addition & 1 deletion questionpy_server/web/_routes/_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from questionpy_server.worker.runtime.package_location import ZipPackageLocation

if TYPE_CHECKING:
from questionpy_server.worker.worker import Worker
from questionpy_server.worker import Worker

file_routes = web.RouteTableDef()

Expand Down
2 changes: 1 addition & 1 deletion questionpy_server/web/_routes/_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from questionpy_server.worker.runtime.package_location import ZipPackageLocation

if TYPE_CHECKING:
from questionpy_server.worker.worker import Worker
from questionpy_server.worker import Worker

package_routes = web.RouteTableDef()

Expand Down
168 changes: 168 additions & 0 deletions questionpy_server/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,181 @@
# This file is part of the QuestionPy Server. (https://questionpy.org)
# The QuestionPy Server is free software released under terms of the MIT license. See LICENSE.md.
# (c) Technische Universität Berlin, innoCampus <[email protected]>
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum

from pydantic import BaseModel

from questionpy_common.api.attempt import AttemptModel, AttemptScoredModel
from questionpy_common.elements import OptionsFormDefinition
from questionpy_common.environment import RequestUser, WorkerResourceLimits
from questionpy_common.manifest import PackageFile
from questionpy_server.models import AttemptStarted, QuestionCreated
from questionpy_server.utils.manifest import ComparableManifest
from questionpy_server.worker.runtime.messages import MessageToWorker
from questionpy_server.worker.runtime.package_location import PackageLocation


class WorkerResources(BaseModel):
"""Current resource usage."""

memory: int
cpu_time_since_last_call: float
total_cpu_time: float


class WorkerState(Enum):
NOT_RUNNING = 1
IDLE = 2
SERVER_AWAITS_RESPONSE = 3 # server send a message to worker and is waiting for a response
WORKER_AWAITS_RESPONSE = 4 # worker send a request/message to server and server is now processing the request


@dataclass
class PackageFileData:
"""Represents a file read from a package."""

size: int
"""The total size of the file in bytes."""
mime_type: str | None
"""Mime type as reported by the package.
Usually this is derived from the file extension at build time and listed in the manifest.
"""
data: bytes


class Worker(ABC):
"""Interface for worker implementations."""

def __init__(self, package: PackageLocation, limits: WorkerResourceLimits | None) -> None:
self.package = package
self.limits = limits
self.state = WorkerState.NOT_RUNNING

@abstractmethod
async def start(self) -> None:
"""Start and initialize the worker process.
Only after this method finishes is the worker ready to accept other messages.
"""

@abstractmethod
async def stop(self, timeout: float) -> None:
"""Ask the worker to exit gracefully and wait for at most timeout seconds before killing it.
If the worker is not running for any reason, this method should do nothing.
"""

@abstractmethod
async def kill(self) -> None:
"""Kill the worker process without waiting for it to exit.
If the worker is not running for any reason, this method should do nothing.
"""

@abstractmethod
def send(self, message: MessageToWorker) -> None:
"""Send a message to the worker."""

@abstractmethod
async def get_resource_usage(self) -> WorkerResources | None:
"""Get the worker's current resource usage. If unknown or unsupported, return None."""

@abstractmethod
async def get_manifest(self) -> ComparableManifest:
"""Get manifest of the main package in the worker."""

@abstractmethod
async def get_options_form(
self, request_user: RequestUser, question_state: str | None
) -> tuple[OptionsFormDefinition, dict[str, object]]:
"""Get the form used to create a new or edit an existing question.
Args:
request_user: Information on the user this request is for.
question_state: The current question state if editing, or ``None`` if creating a new question.
Returns:
Tuple of the form definition and the current data of the inputs.
"""

@abstractmethod
async def create_question_from_options(
self, request_user: RequestUser, old_state: str | None, form_data: dict[str, object]
) -> QuestionCreated:
"""Create or update the question (state) with the form data from a submitted question edit form.
Args:
request_user: Information on the user this request is for.
old_state: The current question state if editing, or ``None`` if creating a new question.
form_data: Form data from a submitted question edit form.
Returns:
New question.
"""

@abstractmethod
async def start_attempt(self, request_user: RequestUser, question_state: str, variant: int) -> AttemptStarted:
"""Start an attempt at this question with the given variant.
Args:
request_user: Information on the user this request is for.
question_state: The question that is to be attempted.
variant: Not implemented.
Returns:
The started attempt consisting of opaque attempt state and metadata.
"""

@abstractmethod
async def get_attempt(
self,
*,
request_user: RequestUser,
question_state: str,
attempt_state: str,
scoring_state: str | None = None,
response: dict | None = None,
) -> AttemptModel:
"""Create an attempt object for a previously started attempt.
Args:
request_user: Information on the user this request is for.
question_state: The question the attempt belongs to.
attempt_state: The `attempt_state` attribute of an attempt which was previously returned by
:meth:`start_attempt`.
scoring_state: Not implemented.
response: The response currently entered by the student.
Returns:
Metadata of the attempt.
"""

@abstractmethod
async def score_attempt(
self,
*,
request_user: RequestUser,
question_state: str,
attempt_state: str,
scoring_state: str | None = None,
response: dict,
) -> AttemptScoredModel:
"""TODO: write docstring."""

@abstractmethod
async def get_static_file(self, path: str) -> PackageFileData:
"""Reads the static file at the given path in the package.
Args:
path: Path relative to the `dist` directory of the package.
Raises:
FileNotFoundError: If no static file exists at the given path.
"""

@abstractmethod
async def get_static_file_index(self) -> dict[str, PackageFile]:
"""Returns the index of static files as declared in the package's manifest."""
4 changes: 4 additions & 0 deletions questionpy_server/worker/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ class WorkerStartError(Exception):

class WorkerCPUTimeLimitExceededError(Exception):
pass


class StaticFileSizeMismatchError(Exception):
pass
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from questionpy_common.manifest import Manifest, PackageFile
from questionpy_server.models import AttemptStarted, QuestionCreated
from questionpy_server.utils.manifest import ComparableManifest
from questionpy_server.worker.exception import WorkerNotRunningError, WorkerStartError
from questionpy_server.worker import PackageFileData, Worker, WorkerState
from questionpy_server.worker.exception import StaticFileSizeMismatchError, WorkerNotRunningError, WorkerStartError
from questionpy_server.worker.runtime.messages import (
CreateQuestionFromOptions,
Exit,
Expand All @@ -39,7 +40,6 @@
PackageLocation,
ZipPackageLocation,
)
from questionpy_server.worker.worker import PackageFileData, Worker, WorkerState

if TYPE_CHECKING:
from pathlib import Path
Expand Down Expand Up @@ -290,7 +290,3 @@ async def get_static_file(self, path: str) -> PackageFileData:

async def get_static_file_index(self) -> dict[str, PackageFile]:
return (await self.get_manifest()).static_files


class StaticFileSizeMismatchError(Exception):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
from questionpy_server.worker import WorkerResources
from questionpy_server.worker.connection import ServerToWorkerConnection
from questionpy_server.worker.exception import WorkerNotRunningError, WorkerStartError
from questionpy_server.worker.impl._base import BaseWorker
from questionpy_server.worker.runtime.messages import MessageToServer, MessageToWorker
from questionpy_server.worker.runtime.package_location import PackageLocation
from questionpy_server.worker.worker.base import BaseWorker

if TYPE_CHECKING:
from asyncio.subprocess import Process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from questionpy_common.environment import WorkerResourceLimits
from questionpy_server.worker.connection import ServerToWorkerConnection
from questionpy_server.worker.exception import WorkerNotRunningError
from questionpy_server.worker.impl._base import BaseWorker
from questionpy_server.worker.runtime.connection import WorkerToServerConnection
from questionpy_server.worker.runtime.manager import WorkerManager
from questionpy_server.worker.runtime.package_location import PackageLocation
from questionpy_server.worker.runtime.streams import AsyncReadAdapter, DuplexPipe
from questionpy_server.worker.worker.base import BaseWorker

log = logging.getLogger(__name__)

Expand Down
4 changes: 2 additions & 2 deletions questionpy_server/worker/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

from questionpy_common.constants import MiB
from questionpy_common.environment import WorkerResourceLimits
from questionpy_server.worker.impl.subprocess import SubprocessWorker
from questionpy_server.worker.runtime.package_location import PackageLocation

from . import Worker
from .exception import WorkerStartError
from .worker import Worker
from .worker.subprocess import SubprocessWorker


class WorkerPool:
Expand Down
Loading

0 comments on commit b89bc34

Please sign in to comment.