Skip to content

Commit

Permalink
add tornado render server
Browse files Browse the repository at this point in the history
  • Loading branch information
rmorshea committed Jan 19, 2021
1 parent e477492 commit c53d871
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 42 deletions.
42 changes: 32 additions & 10 deletions idom/server/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
_Self = TypeVar("_Self", bound="AbstractRenderServer[Any, Any]")


class AbstractRenderServer(Generic[_App, _Config]):
class AbstractRenderServer(Generic[_App, _Config], abc.ABC):
"""Base class for all IDOM server application and extension implementations.
It is assumed that IDOM will be used in conjuction with some async-enabled server
Expand Down Expand Up @@ -40,17 +40,19 @@ def application(self) -> _App:
raise RuntimeError("No application registered.")
return self._app

def run(self, *args: Any, **kwargs: Any) -> None:
def run(self, host: str, port: int, *args: Any, **kwargs: Any) -> None:
"""Run as a standalone application."""
if self._app is None:
app = self._default_application(self._config)
self.register(app)
else: # pragma: no cover
app = self._app
if not self._daemonized: # pragma: no cover
return self._run_application(app, self._config, args, kwargs)
return self._run_application(self._config, app, host, port, args, kwargs)
else:
return self._run_application_in_thread(app, self._config, args, kwargs)
return self._run_application_in_thread(
self._config, app, host, port, args, kwargs
)

def daemon(self, *args: Any, **kwargs: Any) -> Thread:
"""Run the standalone application in a seperate thread."""
Expand All @@ -65,8 +67,10 @@ def daemon(self, *args: Any, **kwargs: Any) -> Thread:

def register(self: _Self, app: Optional[_App]) -> _Self:
"""Register this as an extension."""
self._setup_application(app, self._config)
self._setup_application_did_start_event(app, self._server_did_start)
self._setup_application(self._config, app)
self._setup_application_did_start_event(
self._config, app, self._server_did_start
)
self._app = app
return self

Expand All @@ -77,6 +81,10 @@ def wait_until_server_start(self, timeout: float = 3.0):
f"Server did not start within {timeout} seconds"
)

@abc.abstractmethod
def stop(self) -> None:
"""Stop a currently running application"""

@abc.abstractmethod
def _create_config(self, config: Optional[_Config]) -> _Config:
"""Return the default configuration options."""
Expand All @@ -87,25 +95,39 @@ def _default_application(self, config: _Config) -> _App:
raise NotImplementedError()

@abc.abstractmethod
def _setup_application(self, app: _App, config: _Config) -> None:
def _setup_application(self, config: _Config, app: _App) -> None:
"""General application setup - add routes, templates, static resource, etc."""
raise NotImplementedError()

@abc.abstractmethod
def _setup_application_did_start_event(self, app: _App, event: Event) -> None:
def _setup_application_did_start_event(
self, config: _Config, app: _App, event: Event
) -> None:
"""Register a callback to the app indicating whether the server has started"""
raise NotImplementedError()

@abc.abstractmethod
def _run_application(
self, app: _App, config: _Config, args: Tuple[Any, ...], kwargs: Dict[str, Any]
self,
config: _Config,
app: _App,
host: str,
port: int,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
) -> None:
"""Run the application in the main thread"""
raise NotImplementedError()

@abc.abstractmethod
def _run_application_in_thread(
self, app: _App, config: _Config, args: Tuple[Any, ...], kwargs: Dict[str, Any]
self,
config: _Config,
app: _App,
host: str,
port: int,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
) -> None:
"""This function has been called inside a daemon thread to run the application"""
raise NotImplementedError()
46 changes: 35 additions & 11 deletions idom/server/flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ class FlaskRenderServer(AbstractRenderServer[Flask, Config]):
"""Base class for render servers which use Flask"""

_dispatcher_type: AbstractDispatcher
_wsgi_server: pywsgi.WSGIServer

def stop(self, timeout: Optional[float] = None) -> None:
try:
server = self._wsgi_server
except AttributeError: # pragma: no cover
raise RuntimeError(
f"Application is not running or was not started by {self}"
)
else:
server.stop(timeout)

def _create_config(self, config: Optional[Config]) -> Config:
return Config(
Expand All @@ -51,10 +62,10 @@ def _create_config(self, config: Optional[Config]) -> Config:
def _default_application(self, config: Config) -> Flask:
return Flask(config["import_name"])

def _setup_application(self, app: Flask, config: Config) -> None:
def _setup_application(self, config: Config, app: Flask) -> None:
bp = Blueprint("idom", __name__, url_prefix=config["url_prefix"])

self._setup_blueprint_routes(bp, config)
self._setup_blueprint_routes(config, bp)

cors_config = config["cors"]
if cors_config: # pragma: no cover
Expand Down Expand Up @@ -84,7 +95,7 @@ def recv() -> LayoutEvent:
None,
)

def _setup_blueprint_routes(self, blueprint: Blueprint, config: Config) -> None:
def _setup_blueprint_routes(self, config: Config, blueprint: Blueprint) -> None:
if config["serve_static_files"]:

@blueprint.route("/client/<path:path>")
Expand All @@ -98,21 +109,33 @@ def redirect_to_index():
return redirect(url_for("idom.send_build_dir", path="index.html"))

def _setup_application_did_start_event(
self, app: Flask, event: ThreadEvent
self, config: Config, app: Flask, event: ThreadEvent
) -> None:
@app.before_first_request
def server_did_start():
event.set()

def _run_application(
self, app: Flask, config: Config, args: Tuple[Any, ...], kwargs: Dict[str, Any]
self,
config: Config,
app: Flask,
host: str,
port: int,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
) -> None:
self._generic_run_application(app, *args, **kwargs)
self._generic_run_application(app, host, port, *args, **kwargs)

def _run_application_in_thread(
self, app: Flask, config: Config, args: Tuple[Any, ...], kwargs: Dict[str, Any]
self,
config: Config,
app: Flask,
host: str,
port: int,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
) -> None:
self._generic_run_application(app, *args, **kwargs)
self._generic_run_application(app, host, port, *args, **kwargs)

def _generic_run_application(
self,
Expand All @@ -121,19 +144,20 @@ def _generic_run_application(
port: int = 5000,
debug: bool = False,
*args: Any,
**kwargs
**kwargs,
):
if debug:
logging.basicConfig(level=logging.DEBUG) # pragma: no cover
logging.debug("Starting server...")
_StartCallbackWSGIServer(
self._wsgi_server = _StartCallbackWSGIServer(
self._server_did_start.set,
(host, port),
app,
*args,
handler_class=WebSocketHandler,
**kwargs,
).serve_forever()
)
self._wsgi_server.serve_forever()


class PerClientStateServer(FlaskRenderServer):
Expand Down
43 changes: 29 additions & 14 deletions idom/server/sanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
SendCoroutine,
RecvCoroutine,
)
from idom.core.layout import LayoutEvent, Layout
from idom.core.layout import LayoutEvent, Layout, LayoutUpdate
from idom.client.manage import BUILD_DIR

from .base import AbstractRenderServer
Expand Down Expand Up @@ -54,10 +54,10 @@ def _create_config(self, config: Optional[Config]) -> Config:
def _default_application(self, config: Config) -> Sanic:
return Sanic()

def _setup_application(self, app: Sanic, config: Config) -> None:
def _setup_application(self, config: Config, app: Sanic) -> None:
bp = Blueprint(f"idom_dispatcher_{id(self)}", url_prefix=config["url_prefix"])

self._setup_blueprint_routes(bp, config)
self._setup_blueprint_routes(config, bp)

cors_config = config["cors"]
if cors_config:
Expand All @@ -66,25 +66,26 @@ def _setup_application(self, app: Sanic, config: Config) -> None:

app.blueprint(bp)

def _setup_application_did_start_event(self, app: Sanic, event: Event) -> None:
def _setup_application_did_start_event(
self, config: Config, app: Sanic, event: Event
) -> None:
async def server_did_start(app: Sanic, loop: asyncio.AbstractEventLoop) -> None:
event.set()

app.register_listener(server_did_start, "after_server_start")

def _setup_blueprint_routes(self, blueprint: Blueprint, config: Config) -> None:
def _setup_blueprint_routes(self, config: Config, blueprint: Blueprint) -> None:
"""Add routes to the application blueprint"""

@blueprint.websocket("/stream") # type: ignore
async def model_stream(
request: request.Request, socket: WebSocketCommonProtocol
) -> None:
async def sock_send(value: Any) -> None:
async def sock_send(value: LayoutUpdate) -> None:
await socket.send(json.dumps(value))

async def sock_recv() -> LayoutEvent:
event = json.loads(await socket.recv())
return LayoutEvent(event["target"], event["data"])
return LayoutEvent(**json.loads(await socket.recv()))

element_params = {k: request.args.get(k) for k in request.args}
await self._run_dispatcher(sock_send, sock_recv, element_params)
Expand All @@ -103,13 +104,25 @@ def redirect_to_index(
)

def _run_application(
self, app: Sanic, config: Config, args: Tuple[Any, ...], kwargs: Dict[str, Any]
self,
config: Config,
app: Sanic,
host: str,
port: int,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
) -> None:
self._loop = asyncio.get_event_loop()
app.run(*args, **kwargs)
app.run(host, port, *args, **kwargs)

def _run_application_in_thread(
self, app: Sanic, config: Config, args: Tuple[Any, ...], kwargs: Dict[str, Any]
self,
config: Config,
app: Sanic,
host: str,
port: int,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
) -> None:
try:
loop = asyncio.get_event_loop()
Expand All @@ -122,7 +135,9 @@ def _run_application_in_thread(
# what follows was copied from:
# https://github.com/sanic-org/sanic/blob/7028eae083b0da72d09111b9892ddcc00bce7df4/examples/run_async_advanced.py

serv_coro = app.create_server(*args, **kwargs, return_asyncio_server=True)
serv_coro = app.create_server(
host, port, *args, **kwargs, return_asyncio_server=True
)
serv_task = asyncio.ensure_future(serv_coro, loop=loop)
server = loop.run_until_complete(serv_task)
server.after_start()
Expand Down Expand Up @@ -167,10 +182,10 @@ class SharedClientStateServer(SanicRenderServer):
_dispatcher_type = SharedViewDispatcher
_dispatcher: SharedViewDispatcher

def _setup_application(self, app: Sanic, config: Config) -> None:
def _setup_application(self, config: Config, app: Sanic) -> None:
app.register_listener(self._activate_dispatcher, "before_server_start")
app.register_listener(self._deactivate_dispatcher, "before_server_stop")
super()._setup_application(app, config)
super()._setup_application(config, app)

async def _activate_dispatcher(
self, app: Sanic, loop: asyncio.AbstractEventLoop
Expand Down
Loading

0 comments on commit c53d871

Please sign in to comment.