From b7ed3843d3d5c6c8b0f35fc32bf1b87a34d29a75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Kucmus?= Date: Fri, 28 Jun 2024 12:22:17 +0200 Subject: [PATCH] Code refactor, cleanup and improvements Features: - add documentation - switch to Hatch - move to Typer - add the ability to run Smyth outside of it's `python -m smyth` entrypoint (TOML config is not required in that case) - remove redundant code - fix configuration names (breaking changes!) - get rid of the process definition idea - use handler configs instead - add the ability to simulate `boto3.Lambda.invoke` - change the way subprocesses are handled - now those start with the application, immedietly showing if something is misconfigured - subprocess runner, target function is simplified - dispatch strategies are now functions (breaking changes!) - improve typing Internal improvements: - add code quality CI/CD - use Hatch in CI/CD - use Hatch for static-analysis with Ruff and MyPy Not done: - tests - resource leak warning --- .github/workflows/code_quality.yaml | 90 +++++++----- .github/workflows/deploy_docs.yaml | 32 +++++ .github/workflows/publish.yml | 19 +-- .gitignore | 2 + README.md | 166 +++------------------- docs/assets/mirumee.png | Bin 0 -> 6386 bytes docs/index.md | 112 +++++++++++++++ docs/javascripts/tablesort.js | 6 + docs/overrides/partials/copyright.html | 22 +++ docs/stylesheets/extra.css | 75 ++++++++++ docs/user_guide/all_settings.md | 59 ++++++++ docs/user_guide/concurrency.md | 44 ++++++ docs/user_guide/custom_entrypoint.md | 73 ++++++++++ docs/user_guide/event_generators.md | 78 ++++++++++ docs/user_guide/index.md | 90 ++++++++++++ docs/user_guide/invoke.md | 52 +++++++ mkdocs.yml | 79 +++++++++++ pyproject.toml | 165 +++++++++++++++------ src/smyth/__main__.py | 79 ++++++----- src/smyth/app.py | 40 ------ src/smyth/config.py | 19 +-- src/smyth/context.py | 16 +-- src/smyth/dispatcher/__init__.py | 1 - src/smyth/dispatcher/dispatcher.py | 106 -------------- src/smyth/dispatcher/exceptions.py | 18 --- src/smyth/dispatcher/process.py | 123 ---------------- src/smyth/dispatcher/runner.py | 189 ------------------------- src/smyth/dispatcher/strategy.py | 58 -------- src/smyth/dispatcher/type.py | 22 --- src/smyth/endpoints/__init__.py | 4 - src/smyth/endpoints/invoker.py | 39 ----- src/smyth/endpoints/status.py | 32 ----- src/smyth/event.py | 6 +- src/smyth/exceptions.py | 24 ++++ src/smyth/process.py | 87 ------------ src/smyth/runner.py | 166 ---------------------- src/smyth/runner/__init__.py | 0 src/smyth/runner/fake_context.py | 60 ++++++++ src/smyth/runner/process.py | 76 ++++++++++ src/smyth/runner/runner.py | 116 +++++++++++++++ src/smyth/runner/strategy.py | 36 +++++ src/smyth/schema.py | 30 ---- src/smyth/server.py | 99 ------------- src/smyth/server/__init__.py | 0 src/smyth/server/app.py | 74 ++++++++++ src/smyth/server/endpoints.py | 88 ++++++++++++ src/smyth/smyth.py | 117 +++++++++++++++ src/smyth/types.py | 50 +++++++ src/smyth/utils.py | 3 +- tests/conftest.py | 55 +++++++ tests/test_context.py | 40 ++++++ 51 files changed, 1722 insertions(+), 1315 deletions(-) create mode 100644 .github/workflows/deploy_docs.yaml create mode 100644 docs/assets/mirumee.png create mode 100644 docs/index.md create mode 100644 docs/javascripts/tablesort.js create mode 100644 docs/overrides/partials/copyright.html create mode 100644 docs/stylesheets/extra.css create mode 100644 docs/user_guide/all_settings.md create mode 100644 docs/user_guide/concurrency.md create mode 100644 docs/user_guide/custom_entrypoint.md create mode 100644 docs/user_guide/event_generators.md create mode 100644 docs/user_guide/index.md create mode 100644 docs/user_guide/invoke.md create mode 100644 mkdocs.yml delete mode 100644 src/smyth/app.py delete mode 100644 src/smyth/dispatcher/__init__.py delete mode 100644 src/smyth/dispatcher/dispatcher.py delete mode 100644 src/smyth/dispatcher/exceptions.py delete mode 100644 src/smyth/dispatcher/process.py delete mode 100644 src/smyth/dispatcher/runner.py delete mode 100644 src/smyth/dispatcher/strategy.py delete mode 100644 src/smyth/dispatcher/type.py delete mode 100644 src/smyth/endpoints/__init__.py delete mode 100644 src/smyth/endpoints/invoker.py delete mode 100644 src/smyth/endpoints/status.py delete mode 100644 src/smyth/process.py delete mode 100644 src/smyth/runner.py create mode 100644 src/smyth/runner/__init__.py create mode 100644 src/smyth/runner/fake_context.py create mode 100644 src/smyth/runner/process.py create mode 100644 src/smyth/runner/runner.py create mode 100644 src/smyth/runner/strategy.py delete mode 100644 src/smyth/schema.py delete mode 100644 src/smyth/server.py create mode 100644 src/smyth/server/__init__.py create mode 100644 src/smyth/server/app.py create mode 100644 src/smyth/server/endpoints.py create mode 100644 src/smyth/smyth.py create mode 100644 src/smyth/types.py create mode 100644 tests/conftest.py create mode 100644 tests/test_context.py diff --git a/.github/workflows/code_quality.yaml b/.github/workflows/code_quality.yaml index d841d80..7ecf82f 100644 --- a/.github/workflows/code_quality.yaml +++ b/.github/workflows/code_quality.yaml @@ -1,53 +1,73 @@ name: Code Quality -on: [push] +on: + push: jobs: - coverage: - name: Coverage + format-check: + name: Format Check runs-on: ubuntu-latest - env: - COVERAGE: 90 steps: - - name: Checkout source code - uses: actions/checkout@v4 + - uses: actions/checkout@v4 - name: Set up Python 3.12 - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version-file: pyproject.toml - - name: Install Poetry - uses: snok/install-poetry@v1.3.4 - with: - version: 1.8.2 - virtualenvs-create: true - virtualenvs-in-project: true - - - name: Load cached venv - id: cached-poetry-dependencies - uses: actions/cache@v3 - with: - path: .venv - key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-1.7.1-${{ hashFiles('**/poetry.lock') }} + - name: Install Hatch + uses: pypa/hatch@257e27e51a6a5616ed08a39a408a21c35c9931bc - - name: Install dependencies - if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction --no-root + - name: Run checks + run: | + hatch fmt --check + type-check: + name: Type Check + needs: [format-check] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 - - name: Install library - run: poetry install --no-interaction + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version-file: pyproject.toml - # - name: Check coverage - # run: poetry run pytest --cov=smyth --cov-fail-under=${{ env.COVERAGE }} src + - name: Install Hatch + uses: pypa/hatch@257e27e51a6a5616ed08a39a408a21c35c9931bc - - name: mypy - run: poetry run mypy src - Ruff: - name: Ruff + - name: Run type checks + run: | + hatch run types:check + unit-test: + name: Unit Test + needs: [format-check, type-check] + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11", "3.12"] runs-on: ubuntu-latest steps: - - name: Ruff Check - uses: jpetrucciani/ruff-check@main + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 with: - path: "." + python-version: ${{ matrix.python-version }} + + - name: Install Hatch + uses: pypa/hatch@257e27e51a6a5616ed08a39a408a21c35c9931bc + + - name: Run tests + if: ${{ matrix.python-version != '3.12' }} + run: | + hatch test -i python=${{ matrix.python-version }} + + - name: Run tests with coverage + if: ${{ matrix.python-version == '3.12' }} + run: | + hatch test --cover -i python=${{ matrix.python-version }} + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ !contains(github.ref, 'release/')}} diff --git a/.github/workflows/deploy_docs.yaml b/.github/workflows/deploy_docs.yaml new file mode 100644 index 0000000..14dd6da --- /dev/null +++ b/.github/workflows/deploy_docs.yaml @@ -0,0 +1,32 @@ +name: Deploy documentation + +on: + push: + branches: + - main + +jobs: + docs-publish: + name: publish documentation + runs-on: ubuntu-latest + permissions: + contents: write + steps: + - name: Checkout source code + uses: actions/checkout@v4 + + - name: Configure Git Credentials + run: | + git config user.name github-actions[bot] + git config user.email 41898282+github-actions[bot]@users.noreply.github.com + + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version-file: pyproject.toml + + - name: Install Hatch + uses: pypa/hatch@257e27e51a6a5616ed08a39a408a21c35c9931bc + + - name: Build documentation + run: hatch run docs:deploy diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 227bfc7..403862b 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -1,15 +1,10 @@ -name: release +name: Publish on: release: types: - published -env: - STABLE_PYTHON_VERSION: '3.12' - PYTHONUNBUFFERED: "1" - FORCE_COLOR: "1" - jobs: pypi-publish: name: upload release to PyPI @@ -23,19 +18,15 @@ jobs: uses: actions/checkout@v4 - name: Set up Python 3.12 - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version-file: pyproject.toml - - name: Install Poetry - uses: snok/install-poetry@v1.3.4 - with: - version: 1.8.2 - virtualenvs-create: true - virtualenvs-in-project: true + - name: Install Hatch + uses: pypa/hatch@257e27e51a6a5616ed08a39a408a21c35c9931bc - name: Build package - run: poetry build + run: hatch build - name: Publish package distributions to PyPI uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/.gitignore b/.gitignore index c20967a..d50861f 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,5 @@ __pycache__ package *.zip +.mypy_cache +.coverage diff --git a/README.md b/README.md index 195345a..78cd555 100644 --- a/README.md +++ b/README.md @@ -1,86 +1,30 @@ # Smyth -A versatile tool that enhances your AWS Lambda development experience. - -## Rationale - -Despite extensive testing, nothing matched the efficiency we experienced when developing ASGI (e.g., FastAPI) applications. Below is an overview of the alternatives we evaluated: - -- **Serverless (with the serverless-offline plugin)**: This seemed promising, deploying a Node.js server to invoke the lambda in a subprocess, effectively simulating the AWS Lambda runtime through an AWS API Gateway V2 proxy. However, its maintenance is lacking; for instance, a pending pull request to add Python 3.12 as a supported runtime has been unresolved for nearly three months. Additionally, the development experience is hindered by the absence of Python Debugger support. - -- **Localstack**: While Localstack offers useful features, its lambda functionality is less satisfactory. Testing code requires building and "uploading" the lambda to Localstack for invocation, a process that takes about a minute—far too slow for efficient local development. - -- **AWS SAM**: Although newer and of higher quality than Serverless, AWS SAM offers fewer plugins and only supports pip for dependency management. While faster than Localstack, it still necessitates building the lambda for invocation. - -- **Running the code locally**: We also explored using pytest to directly invoke lambda handlers. This approach is viable, but our specific needs require exposing lambdas over HTTP to interact with a remote (or local) instance of Saleor. - -- **CDK**: a tool or framework that allows one to define the Lambda stack in Python, it can spin up local Lambda invocations quickly and with many different events. The lacking feature is the HTTP exposure of the Lambda, which is critical when working with Saleor Apps. - -- **Flask**: Flask could be used to invoke a handler from an endpoint, we've tried that. After a while of trying to make it work from a single entrypoint (i.e. one Docker container) we ended up with... well this, just that we are using a modern ASGI framework with Uvicorn as the HTTP server. +Smyth is a versatile tool designed to enhance your AWS Lambda development experience. It is a pure Python tool that allows for easy customization and state persistence, making your Lambda development more efficient and developer-friendly. ## Features -- **Pure Python**: The tool is entirely written in Python, offering the flexibility to tailor it to your specific requirements. -- **Customizability**: You have the ability to modify both the `event` and `context` data structures to suit your needs. -- **State Persistence**: Simulating both cold and warm starts, Lambda Processes retain their state across invocations, mimicking the behavior of actual AWS Lambdas. The state is reset only when code changes trigger Uvicorn to reload. -- **Efficiency**: The tool is streamlined and efficient, relying solely on Python to execute the code. -- **Inspired by Serverless Framework**: Its design takes cues from the Serverless framework, known for its effectiveness in managing serverless applications. -- **Developer-Friendly**: Tailored for Python web developers, it integrates seamlessly with common development tools and practices, such as PDB, iPDB, VSCode debugging, and .env file support, ensuring a familiar and productive environment. - -## How Smyth Works - -Understanding the components involved in the Smyth environment is crucial for effective development. Here's a breakdown of the key terms: - -- **Uvicorn**: This is an ASGI server responsible for translating incoming HTTP requests into Python callable formats that ASGI applications can understand. It serves as the interface between the web and your application, enabling asynchronous web server capabilities for Python. - -- **Starlette**: A lightweight ASGI framework designed to catch and handle incoming requests. In the context of the Smyth, Starlette facilitates communication with Lambda Processes, effectively acting as a bridge that routes requests to the appropriate handlers. - -- **Lambda Process**: This refers to a dedicated Python process that runs a specific Lambda Handler. The primary purpose of maintaining separate processes for each Lambda Handler is to simulate the "warm" state of AWS Lambda functions, allowing them to retain their state between invocations. This setup mirrors the behavior of AWS Lambda in a local development environment. - -- **Lambda Handler**: The core component of your Lambda function, written as part of your project. This is the code you craft to respond to Lambda invocations, typically defined as a Python function that accepts an `event` dictionary and a `context` object. The `event` contains information about the invocation, such as the triggering event or data passed to the function, while the `context` provides runtime information about the invocation, the function, and the execution environment. - -Smyth operates similarly to Serverless (offline) but is implemented in pure Python with minimal dependencies. It utilizes a Starlette endpoint to provide a catch-all route. Uvicorn, in reload mode, runs Starlette, which automatically restarts the server and refreshes all Lambda processes. At startup, Starlette reads a TOML configuration file, initializing a process for each defined lambda handler. These processes import the handlers and start listening on a `multiprocessing.Queue`. When a request is received, Starlette's endpoint converts it into a Lambda event and places it in the queue. The subprocess then picks it up, invokes the handler, and returns the result to the main process via the queue, which then converts it back into an HTTP response. - -```mermaid -sequenceDiagram - actor User - participant UVIC as Uvicorn - participant STAR as Starlette - participant PROC as Lambda Process - participant HAND as Lambda Handler - - UVIC->>+STAR: Start - STAR->>+PROC: Start - - User->>+UVIC: HTTP Request - UVIC->>+STAR: ASGI Request - - STAR->>STAR: Lookup handlers by path - STAR->>+PROC: Send event and context - PROC->>+HAND: Invoke handler - HAND->>-PROC: Result - PROC->>-STAR: Result - - STAR->>-UVIC: ASGI Response - UVIC->>-User: HTTP Response - - PROC->>-STAR: Terminate - STAR->>-UVIC: End -``` +- **Pure Python**: Entirely written in Python, allowing flexibility to tailor it to your specific requirements. +- **Customizability**: Modify both the `event` and `context` data structures as needed. +- **State Persistence**: Simulates both cold and warm starts, retaining state across invocations, mimicking actual AWS Lambda behavior. +- **Efficiency**: Streamlined and efficient, relying solely on Python for code execution. +- **Inspired by Serverless Framework**: Designed with insights from the Serverless framework, effectively managing serverless applications. +- **Developer-Friendly**: Integrates seamlessly with common development tools and practices, such as PDB, iPDB, VSCode debugging, and .env file support. ## Installation -It's recommended to install this tool in your Lambda project virtual environment with pip, due to its rapid development phase and absence from PyPi. Ensure your `GH_TOKEN` is configured if necessary: +Install Smyth as a development dependency using Poetry or pip: -```sh -pip install git+https://github.com/mirumee/smyth@main +### Poetry +```bash +poetry add --group dev smyth ``` -Or git submodule it into your project and install with `poetry -G dev -e ../libs/smyth`. - -## Configuration +### pip +```bash +pip install smyth +``` -### TOML Define the following settings in your Lambda project's `pyproject.toml` file: ```toml @@ -89,85 +33,15 @@ host = "0.0.0.0" port = 8080 [tool.smyth.handlers.saleor_handler] -handler_path = "marina_adyen.handlers.saleor.handler.saleor_http_handler" +handler_path = "my_project.handlers.saleor.handler.saleor_http_handler" url_path = "/saleor/{path:path}" ``` -### CLI - -TOML configuration can be overloaded with `--host` and `--port`. You can also use the `--only` flag to specifically pick handlers defined in the TOML. This is useful if you'd like to run your separate handlers in separate Docker containers (to for example limit their CPU and MEM). - -``` -python -m smyth run --help -Usage: python -m smyth run [OPTIONS] - -Options: - -h, --host TEXT Bind socket to this host. - -p, --port INTEGER Bind socket to this port. - --only TEXT Run only the handler of this name. [list] - --log-level TEXT Log level. - --help Show this message and exit. -``` - -Run the server with: - -``` -python -m smyth run -``` - -### `tool.smyth` Section - -| Key | Default | Description | -|-----------|-----------|----------------------------------------------------------------------------------------------------------------| -| host | `"0.0.0.0"` | `str` The host address for the Uvicorn server to bind to. | -| port | `8080` | `int` The port for the Uvicorn server to bind to. If set to 0, an available port will be chosen automatically. | -| log_level | `"INFO"` | `str` The log level for the main ASGI server process. | - -### `tool.smyth.handlers.{handler_name}` Section - -| Key | Default | Description | -|-----------------------------|----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **handler_path** | (Required) | `str` The Python path to the Lambda handler to be invoked. | -| **url_path** | (Required) | `str` The Starlette-like URL path for routing requests to the Lambda handler. | -| timeout | `None` | `float \| None` The maximum duration (in seconds) before the Lambda times out. | -| event_data_generator_path | `"smyth.event.generate_event_data"` | `str` Python path to a custom Lambda event data generator. | -| context_data_generator_path | `"smyth.context.generate_context_data"` | `str` Python path to a custom Lambda context data generator. | -| fake_coldstart_time | `false` | `bool` If set to true first start will mock the warmup time (0.5 to 1.0 second) - this does nothing but keeps one's brain from forgetting how this lambda will behave in production. | -| log_level | `"INFO"` | `str` The log level for the LambdaProcesses. | -| concurrency | `1` | `int` Number of processes the dispatcher is allowed to spawn | -| dispatch_strategy_path | `"smyth.dispatcher.strategy.RoundRobinDispatchStrategy"` | `str` Python path to a DispatchStrategy class | - -### Dispatch Strategies - -Smyth offers two dispatch strategies to manage how requests are handled by Lambda Processes: - -- **Round Robin (`smyth.dispatcher.strategy.RoundRobinDispatchStrategy`)**: This strategy, not typical for AWS Lambda's behavior, is beneficial during development. It rotates among Lambda Processes for each request, given that concurrency is set higher than `1`. This approach encourages developers to avoid relying on global state across requests, promoting best practices in serverless application design. - -- **First Warm (`smyth.dispatcher.strategy.FirstWarmDispatchStrategy`)**: This strategy prioritizes the use of the first available Lambda Process in a "warm" state to handle incoming requests. If no warm instances are available, it initiates a "cold start". This behavior more closely mimics the operational dynamics of AWS Lambda, where reusing warm instances can lead to faster response times. - -It's important to note that Smyth is intended for local development and not for production use or load testing. The dispatcher and Lambda Process instances are not designed to handle high volumes of concurrent requests and will likely falter under heavy load. This limitation is deliberate, reflecting the tool's focus on local development scenarios where high concurrency is uncommon. - -## Customizing Event and Lambda Context Data - -Customization of Lambda event and context data allows developers to tailor the runtime environment to their specific needs, enhancing the local development experience. For event data customization: - -```python -from smyth.event import generate_event_data - -async def generate_custom_event_data(request: Request): - original_event = await generate_event_data(request) - original_event["my_custom_data"] = "data" - return original_event +Run Smyth with: +```bash +python -m smyth ``` -This example demonstrates how to modify the default event payload generation process to include custom data. Developers can leverage the [AWS Serverless Application Model CLI (SAM)](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html) to generate accurate [example payloads](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-cli-command-reference-sam-local-generate-event.html) for different AWS services, aiding in the creation of realistic local testing scenarios. - -For context data customization, the configuration allows specifying a coroutine that takes additional arguments such as `process_def: ProcessDefinition` and `process: LambdaProcess`. The output from this coroutine is used to create a customized `smyth.dispatcher.runner.FakeLambdaContext`, which can include information about the Smyth runtime environment. This capability is crucial for simulating the full spectrum of AWS Lambda execution contexts, facilitating a more comprehensive and realistic development experience. - -## Status endpoint - -There is a `GET /__/status` endpoint available on the Starlette server, it will present runtime data about the Lambda processes and used configuration. - ## Working with Docker Assuming you have this already installed by Poetry you can use the `Dockerfile.example` and `docker-compose.example.yml` files from this repository to get started. @@ -199,7 +73,7 @@ The combination of Uvicorn reload process and HTTP server process with what is b - [ ] Write tests - [ ] Properly handle Uvicorn exit, kill the LambdaProcesses gracefully -- [ ] Publish on PyPi +- [x] Publish on PyPi ## Name diff --git a/docs/assets/mirumee.png b/docs/assets/mirumee.png new file mode 100644 index 0000000000000000000000000000000000000000..d3375f80e95203278068bfe09c0671599ccd9de9 GIT binary patch literal 6386 zcmeHLdpMNa_kX2QNtdIPkZI_YYRttLW86cG8pb81q~`L@80N-|nNhhFI*N)W5}{JM zm*|v+kghH%DP53GX>ydQLrI0-9(4Qu&hz}9p6B=duX&z5-goV_KWnYeUTbZ8lBk|; zIumA1fFMYR?C#DC7nl$8nDr1{0B@Q*ba-YJf) zshuV%_>?0{H16hIKkwlipj!Lj7S5R-r(4tfzCQX~htak&=W{vne#uG1S%|e08!2t; z-fosuoSUV44qp`aIqUA24^YgCfK4lJTHFBfC2*Xbsbpv8FX4b7vZGS%+^eY561;rR zxh|NbH#O`J>b_S8klW8qa6K|5B*EWkS!zL?2XJ=*&>pu>>ZC}WU?ZzLX`7$en@lH+2i!K>Gq?sf6PvuS8Yvci4wz;;g1^2LT_Z1qw@b)zigQ5dRr+VqS`B4fs&Usc?9l5Q@|J2zRc758;)=_X~ zXYrDq707giQ~ag{9`mb~$R+!xX`7LDNZ(9WL?;DKDDhbLe5K=<_cmrJFhEseAsE!YQ)WbTy9s_ z*AFd=2+t__`<@NRyY(E9Un?ogNDKkboW>N;VRIQz2oev1Y#n4m8Y2`IBj_-j!?#1K z6y-<+hiQjgVM)PIgwAj<$9;V`?7QC6kFh?KVa-H3*iW#PkpKV>7Sj+i9+xj7$?T9r zxFqnb7Ne1fAr*0`9dadwif|T$!w9@N-W-E+m2o6Er2PbhZ8(!f@^M)*0s-FakilZH zkc37{rBZXLg}ES{jmBDATca^JG!BOX8YodDUrdvs_#zWE#4v^nEMkOngkp|>k5FUM z=z<8b9TExV5nue{2`QAX@O;q-3xE%_j3z{5%`s>m5B>cLk=RuNKt>$;*A*f^P^M@f zSR{xDXTYu!m@hW@9>R@Gp?*bBhs5UaghN3A@_sPJWPH^VMuc;RbeIe@%!PRXR0RCP z{-7`Bu)dM?gKz4Yp>V!;1ep7Z|AY3I*oTyX7KK7`5ilauM#(OANVUHtrhvg=l7=3w ziA@)}CXB~@2jLaY0a;1oe(#kUiV2`#77K&H6No4n z%d|q_u`CM|(ZY&`A~IQ4L;?c<;^;$A>U|(NQOR~loH^#3gvzCfS%Pq$9g;$0AiR9P z&G>P6u&A^PDgNEmC91!1B3^*en5lf#PqzqHlWgRmXmb9asi*pD0%4VRQJ95(zm z{L1AF)e-_RR23u|V>kp6O#(BA`~+CTQ;c95pACcdF_N!eGsS5dF0sj*tc%{%?Lp zHsil(0)hBt3Q?ZjJVtJ&PRE6tV8r zQ3fHJgcco_7DuvF^MF;G!;G0Z&Q1tuDl{fmC7R@gOhZ(}B~9L}U%N-4fS};?!6Y2` zQDRh_CWHVN!~VC-tKAk`$2!)yJ}uwv27NdRc%*u!YnZHdG|amo0ps0>8>S z9q6M8aYv7=8$Ahvz=*BTfTj?kp)pDeg1~TW192D*517Uhel z75Ym*wzeGX(Z8fHb|+Eqm%e|nYV@R7o5fjIg1sM)j}u*L_AeCs+oy}oIxz3B?0B7q z3;AR7wmOrWx(4pFFr+d1=2MM5J|1O1iCAt^$G_RGY!pK4qIOYt>BpB#O?7Iy_bz9l?uwa^FTXEY&Y7bT8Ogji2y5~86 zO+!I)1}i(*r%EICqBHw>m(|6`?UBEcCcc`#VP$rJtp32W%Ff2lCb0R4h2m=+%I2uG zH)HpeFD0$AyutR*;DiuSQ?3PXU3J#_*=>8@;<`fr$)D$ui_Y3@XpUOZuHvTWXC;T7 zLbmtDX0rQbArGv2tB<=ppTZeT3uS^E7)=2IiZQLs#vjO_Jx##Cp3L4ad#8n*C800 zPTSrqU9ZMuRc-Aw_75Cv!FZl=z?YPmy=Hi)*HrtC|+fzQn># zZOzH=j39MSA@--LJD{NJ^lq$P2#@MJryi?W`em>$FCn_D2LHeB~*;C{&)-}%e%*JMFTcEPv z#^YppQbnIUXVD_I!MJQmE`9*fVF0K(bLwc?9jj9wIn$bd14*qlUQ;sQe6y#aD*OB` zd7$2))6LpEkQHr}e0gzwX>DZvU+;pFuP#AbLcyn3lfBoD9WWKfvI6D$DBrr4&L&=7sBVXCX__ zSw>pWk$8e(p1#%st5ss(*5k5EyM6X(vrX*HIwth5daZA`r$U4DSB;7B5#?ytg3qUz zl6noWFt`EJxOIbFf-QL7!N%58x0=hgTQrC)6hVxVjIK*7sy;5QH{AM9bBpOdvu9(yEDW~vZcG<1=s1}l zQ}XFbEy=D~CvNld$}lIapU;Xa)xEtnSC1+A@AMUCr_QU+)ndQ3!IL+nMeptrNMr7| z-$DTHxPMBzk)u8JM7^u$nz*7*%`8ppnK8F0+I(@ruGiMZPr3VriN(9#wsHR~?N0v3 zrnT+kb^T5U=J{fKuEexI&3jums(;`B)g~o5+1O%Zxj@Zx-F<#* zaKyYgW!I(7uCNQw-`gq=4(JVRIQ6_?uUUu76zQQw7at^wPJI4k#~M?!HgVs%>il(m zVU?s83+=n##MQW8O{wVC$>1#yXqWiDu>*x%w-;7?{z#+1w%+|Zy}PtZdk@FGtB+qm zD?0c*I%|2(0)4lW$#IJ7EP|*vCUMr?18ZI?%;j4mB}?B<(j3$X?yKA=_ZAWHMKyZ- zXD8DPhzoOL<13ODuZ*JNNR?6^iM54r{B%b_$Dy%MT~?ERJc}UhB{xlo1C8}d%BfGx zkyB6Xu6&V;1jm8q(oXvOvPMK7`HO^I)q{gtJkmp+p`w((9wcs(OK-Sft38Rz4qa6t#=*D zvsP8h?*gNE1CoiyoJz3jU%dH`b69-_|HnD3cKt}zvGEV+(gHD593X#}t+>+STAR: Start + STAR->>+PROC: Start + + User->>+UVIC: HTTP Request + UVIC->>+STAR: ASGI Request + + STAR->>STAR: Lookup handlers by path + STAR->>+PROC: Send event and context + PROC->>+HAND: Invoke handler + HAND->>-PROC: Result + PROC->>-STAR: Result + + STAR->>-UVIC: ASGI Response + UVIC->>-User: HTTP Response + + PROC->>-STAR: Terminate + STAR->>-UVIC: End +``` + +## Others from Mirumee + +- [Lynara](https://github.com/mirumee/lynara){target="_blank"} - Allows deploying ASGI (FastAPI, Django) applications on Lambda +- [Ariadne](https://ariadnegraphql.org/){target="_blank"} - Schema-first, Python GraphQL server +- [Ariadne Codegen](https://github.com/mirumee/ariadne-codegen){target="_blank"} - GraphQL Python code generator diff --git a/docs/javascripts/tablesort.js b/docs/javascripts/tablesort.js new file mode 100644 index 0000000..6a5afcf --- /dev/null +++ b/docs/javascripts/tablesort.js @@ -0,0 +1,6 @@ +document$.subscribe(function() { + var tables = document.querySelectorAll("article table:not([class])") + tables.forEach(function(table) { + new Tablesort(table) + }) +}) diff --git a/docs/overrides/partials/copyright.html b/docs/overrides/partials/copyright.html new file mode 100644 index 0000000..a0ecf70 --- /dev/null +++ b/docs/overrides/partials/copyright.html @@ -0,0 +1,22 @@ + diff --git a/docs/stylesheets/extra.css b/docs/stylesheets/extra.css new file mode 100644 index 0000000..6c327b1 --- /dev/null +++ b/docs/stylesheets/extra.css @@ -0,0 +1,75 @@ +:root { + --md-primary-fg-color: rgb(245, 192, 59); + --md-primary-fg-color--light: #ecb7b7; + --md-primary-fg-color--dark: #90030c; + --md-accent-fg-color: rgb(245, 192, 59); + --md-accent-fg-color--light: #ecb7b7; + --md-accent-fg-color--dark: #90030c; +} + +.md-header { + background-color: var(--md-primary-fg-color); + color: black; +} +.md-search__input + .md-search__icon { + color: black; +} +.md-search__input::placeholder { + color: black; +} +.md-nav__link[for]:focus, +.md-nav__link[for]:hover, +.md-nav__link[href]:focus, +.md-nav__link[href]:hover { + text-decoration: underline; +} + +.md-typeset a:focus, +.md-typeset a:hover { + text-decoration: underline; +} + +header { + border-bottom: 4px solid black; +} + +.md-copyright { + width: 100%; +} + +.md-copyright .made-with-love { + float: right; + line-height: 24px; +} + +.md-copyright .made-with-love a img { + height: 24px; + vertical-align: top; + opacity: 0.7; + transition: all 0.3s ease-in-out; +} + +.md-copyright .made-with-love a:hover img { + opacity: 1; +} + +tr.benchmarks-best-result { + background-color: var(--md-typeset-ins-color); +} + +tr.benchmarks-worst-result { + background-color: var(--md-typeset-del-color); +} + +@keyframes heart { + 0%, 40%, 80%, 100% { + transform: scale(1); + } + 20%, 60% { + transform: scale(1.15); + } +} +.heart { + animation: heart 1000ms infinite; + color: #90030c; +} diff --git a/docs/user_guide/all_settings.md b/docs/user_guide/all_settings.md new file mode 100644 index 0000000..5b83c7d --- /dev/null +++ b/docs/user_guide/all_settings.md @@ -0,0 +1,59 @@ +# All Settings + +Here's a list of all the settings, including those that are simpler but equally valuable, consolidated on one page: + +## Smyth Settings + +### Host + +`host` - `str` (default: `"0.0.0.0"`) Used by Uvicorn to bind to an address. + +### Port + +`port` - `int` (default: `8080`) Used by Uvicorn as the bind port. + +### Log Level + +`log_level` - `str` (default: `"INFO"`) Sets the logging level for the `uvicorn` and `smyth` logging handlers. + +### Smyth Path Prefix + +`smyth_path_prefix` - `str` (default: `"/smyth"`) The path prefix used for Smyth's status endpoint. Change this if, for any reason, it collides with your path routing. + +## Handler Settings + +### Handler Path + +`handler_path` - `str` (required) The Python path to your Lambda function. + +### URL Path + +`url_path` - `str` (required) The Starlette routing path on which your handler will be exposed. + +### Timeout + +`timeout` - `float` (default: `None`, which means no timeout) The time in seconds after which the Lambda Handler raises a Timeout Exception, simulating Lambda's real-life timeouts. + +### Event Data Generator + +`event_data_generator_path` - `str` (default: `"smyth.event.generate_api_gw_v2_event_data"`) Read more about [event generators here](event_generators.md). + +### Context Data Generator + +`context_data_generator_path` - `str` (default: `"smyth.context.generate_context_data"`) A function similar to the [event generator](event_generators.md), but it constructs the `context`, adding some metadata from Smyth's runtime. You can create and use your own. + +### Fake Coldstart + +`fake_coldstart` - `bool` (default: `False`) Makes the subprocess `time.sleep` for a random time between 0.5 and 1 second when a subprocess is cold, imitating the longer first response time of real Lambdas. + +### Log Level + +`log_level` - `str` (default: `"INFO"`) Log level for Smyth's runner function, which is still part of Smyth but already running in the subprocess. Note that the logging of your Lambda handler code should be set separately. + +### Concurrency + +`concurrency` - `int` (default: `1`) Read more about [concurrency here](concurrency.md). + +### Strategy Function + +`strategy_function_path` - `str` (default: `"smyth.runner.strategy.first_warm"`) Read more about [dispatch strategies here](concurrency.md/#dispatch-strategy). diff --git a/docs/user_guide/concurrency.md b/docs/user_guide/concurrency.md new file mode 100644 index 0000000..174da78 --- /dev/null +++ b/docs/user_guide/concurrency.md @@ -0,0 +1,44 @@ +# Concurrency + +Smyth can also simulate the behavior of real Lambdas in terms of multiprocessing. For instance, every Lambda invocation can be run on a different machine, not holding the state between different runtimes. To simulate this, you can set the `concurrency` setting. Don't think of it in terms of web server performance - Smyth is not meant for production to demand high performance. Instead, consider it a method to keep you in check when developing your Lambda. It serves as a reminder that not everything can be stored in globals, and that in-memory cache might not persist between runs. + +??? example "Cold vs warm starts" + + If we take our code with one handler: + + ```python linenums="1" + COUNT = 0 + + def order_handler(event, context): + global COUNT + COUNT += 1 + print(event, context) + return {"statusCode": 200, "body": f"Orders requests: {COUNT}"} + ``` + + Upon a cold start, the code outside of the handler function (the `COUNT` declaration in this case) will be interpreted. After that, each warm run of that Lambda will maintain the state of the `COUNT`. If the load is high enough, AWS will run more Lambdas for you, which might start from a cold state. + +To set up Smyth to run your handler in more than one subprocess, use the `concurrency` setting (by default it's `1`). + +```toml title="myproject/pyproject.toml" linenums="1" hl_lines="4 9" +[tool.smyth.handlers.order_handler] +handler_path = "smyth_test_app.handlers.order_handler" +url_path = "/orders/{path:path}" +concurrency = 2 + +[tool.smyth.handlers.product_handler] +handler_path = "smyth_test_app.handlers.product_handler" +url_path = "/products/{path:path}" +concurrency = 2 +fake_coldstart = true +strategy_function_path = "smyth.runner.strategy.round_robin" +``` + +## Dispatch Strategy + +Dispatch strategy is controlled by a function that tells Smyth which subprocess from the pool of processes running a handler should be used. There are two built-in strategy functions: + +- `smyth.runner.strategy.first_warm` - (the default) tries to act like AWS, using a warmed-up Lambda (handler) if available. It only thaws a cold one if there is nothing warm or they are busy. +- `smyth.runner.strategy.round_robin` - this one might keep you more in check as it picks the subprocess that was not used for the longest time, effectively using each subprocess one by one. + +You can choose the strategy function (including your own, in the same way as you would an event or context generator) with the `strategy_function_path` setting. diff --git a/docs/user_guide/custom_entrypoint.md b/docs/user_guide/custom_entrypoint.md new file mode 100644 index 0000000..60b997b --- /dev/null +++ b/docs/user_guide/custom_entrypoint.md @@ -0,0 +1,73 @@ +# Custom Entrypoint + +Starting from Smyth 0.4.0, you can use it outside of the provided entrypoint (`python -m smyth`). You can build your own entrypoint for Smyth, which would not require a TOML config but instead have a Python script living in your project like any other development helper script. + +## Example + +### Project Structure + +Let's assume you have an `etc` directory that is not part of the final production package. Put your `smyth_conf.py` file there. + +```hl_lines="3-4" +myproject +├── pyproject.toml +├── etc +│ └── smyth_conf.py +└── src + └── my_app + └── handlers.py +``` + +### Your Smyth Configuration + +Here's an example `smyth_conf.py` file: + +```python title="my_project/etc/smyth_conf.py" linenums="1" +import uvicorn +from starlette.requests import Request +from smyth.smyth import Smyth +from smyth.server.app import SmythStarlette + +def my_handler(event, context): + return {"statusCode": 200, "body": "Hello, World!"} + +async def my_event_data_generator(request: Request): + return { + "requestContext": { + "http": { + "method": "GET", + "path": "/hello", + } + } + } + +smyth = Smyth() + +smyth.add_handler( + name="hello", + path="/hello", + lambda_handler=my_handler, + timeout=1, + concurrency=1, + event_data_generator=my_event_data_generator, +) + +app = SmythStarlette(smyth=smyth, smyth_path_prefix="/smyth") + +if __name__ == "__main__": + uvicorn.run("smyth_conf:app", host="0.0.0.0", port=8080, reload=True) +``` + +Normally, the handler would be imported, but including the custom event generator in this file is a good use case. Use the `SmythStarlette` subclass of `Starlette` - it ensures all subprocesses are run at server start and killed on stop (using ASGI Lifetime). Create a Smyth instance and pass it to your `SmythStarlette` instance. Here, you can fine-tune logging, change Uvicorn settings, etc. + +After that, run your script: + +
+ python etc/smyth_conf.py + INFO: Will watch for changes in these directories: ['/Users/pkucmus/Development/mirumee/smyth_test_app'] + INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit) + INFO: Started reloader process [29441] using StatReload + INFO: Started server process [29443] + INFO: Waiting for application startup. + INFO: Application startup complete. +
diff --git a/docs/user_guide/event_generators.md b/docs/user_guide/event_generators.md new file mode 100644 index 0000000..53ca9ea --- /dev/null +++ b/docs/user_guide/event_generators.md @@ -0,0 +1,78 @@ +# Event Generators + +An event generator is a simple coroutine used by Smyth to transform a Starlette `Request` instance into an `event` dictionary that is eventually used when invoking the Lambda handler. + +Smyth comes with two built-in event generators: `smyth.event.generate_api_gw_v2_event_data` (used by default) and `smyth.event.generate_lambda_invocation_event_data`, which is used in the [invocation endpoint](invoke.md). + +The first one builds a minimal API Gateway Proxy V2 event to simulate a Lambda being triggered by one. The other deserializes the request body (assumes it's proper JSON) and returns just that. + +## Custom Event Generators + +### Example Generator + +If you need to work with events not covered by Smyth, you can create and provide your own. Assuming a simplified API Gateway V1 event, you can create a generator like this: + +```python title="my_project/src/smyth_utils/event.py" linenums="1" +async def generate_api_gw_v1_event_data(request: Request): + source_ip = None + if request.client: + source_ip = request.client.host + + return { + "resource": request.url.path, + "path": request.url.path, + "httpMethod": request.method, + "headers": dict(request.headers), + "queryStringParameters": dict(request.query_params), + "pathParameters": {}, # You may need to populate this based on your routing + "stageVariables": None, + "requestContext": { + "resourceId": "offlineContext_resourceId", + "resourcePath": request.url.path, + "httpMethod": request.method, + "extendedRequestId": "offlineContext_extendedRequestId", + "requestTime": "21/Nov/2020:20:13:27 +0000", + "path": request.url.path, + "accountId": "offlineContext_accountId", + "protocol": request.url.scheme, + "stage": "dev", + "domainPrefix": "offlineContext_domainPrefix", + "requestTimeEpoch": int(request.timestamp().timestamp() * 1000), + "requestId": "offlineContext_requestId", + "identity": { + ... + }, + "domainName": "offlineContext_domainName", + "apiId": "offlineContext_apiId" + }, + "body": (await request.body()).decode("utf-8"), + "isBase64Encoded": False + } +``` + +!!! warning "This is example code; a proper API Gateway V1 generator might need to be different." + +### Configuration + +```toml title="myproject/pyproject.toml" linenums="1" hl_lines="8" +[tool.smyth] +host = "0.0.0.0" +port = 8080 + +[tool.smyth.handlers.order_handler] +handler_path = "my_app.handlers.order_handler" +url_path = "/orders/{path:path}" +event_data_generator_path = "smyth_utils.event.generate_api_gw_v1_event_data" + +[tool.smyth.handlers.product_handler] +handler_path = "my_app.handlers.product_handler" +url_path = "/products/{path:path}" +``` + +Note that `smyth_utils` needs to be in your Python path. + +From this point on, the `order_handler` will receive a different `event` than the `product_handler`. + +## Limited Built-in Generators + +We provided a limited number of generators because there are many possibilities for event simulation. Simulating DynamoDB streams or SQS events locally might be appealing, but we were unsure how these would be used in real-life scenarios. We'd love to hear from the community about this - please don't hesitate to report [:material-github: GitHub issues](https://github.com/mirumee/smyth/issues){target="_blank"} with proposals on what event generators we should include in Smyth. diff --git a/docs/user_guide/index.md b/docs/user_guide/index.md new file mode 100644 index 0000000..2f3e648 --- /dev/null +++ b/docs/user_guide/index.md @@ -0,0 +1,90 @@ +# User Guide + +Smyth is built to have minimal or no impact on the project you are working on. That said, it comes with features that allow you to customize Smyth to the needs of your Lambda project. + +Following this guide will help you understand how to set up your development environment with Smyth. + +## Example Application + +Throughout this guide, we will use two lambda handlers as an example application. + +```python title="my_project/src/my_app/handlers.py" linenums="1" +COUNT = 0 + +def order_handler(event, context): + global COUNT + COUNT += 1 + print(event, context) + return {"statusCode": 200, "body": f"Orders requests: {COUNT}"} + +def product_handler(event, context): + global COUNT + COUNT += 1 + print(event, context) + return {"statusCode": 200, "body": f"Products requests: {COUNT}"} +``` + +These handlers do the exact same thing, and the global `COUNT` is there to illustrate how Smyth handles state between different requests (more about that in [concurrency](concurrency.md)). + +??? question "What is the project's structure?" + + If this is in question, here is the example project's directory structure: + + ``` + myproject + ├── pyproject.toml + └── src + └── my_app + └── handlers.py + ``` + +## Configuration + +Now, in our project's `pyproject.toml`, we can set up Smyth to instruct how the Lambdas will be executed. This setup reflects how your Lambdas will be deployed on AWS behind, for example, an API Gateway. + +```toml title="myproject/pyproject.toml" linenums="1" hl_lines="6-7 10-11" +[tool.smyth] +host = "0.0.0.0" # (1)! +port = 8080 + +[tool.smyth.handlers.order_handler] +handler_path = "my_app.handlers.order_handler" +url_path = "/orders/{path:path}" + +[tool.smyth.handlers.product_handler] +handler_path = "my_app.handlers.product_handler" +url_path = "/products/{path:path}" +``` + +1. Define the host and port on which you want Uvicorn to listen. + +Under `tool.smyth.handlers`, you name and define your handlers. The only two required options are: + +- `handler_path` - the Python path to the Lambda handler. +- `url_path` - the path with which the handler is to be reached by Starlette - uses [Starlette's URL resolving](https://www.starlette.io/routing/#path-parameters){target="_blank"}. + +!!! tip "Custom Smyth Entrypoint" + + You don't have to use the TOML config - read more about [Custom Entrypoint](custom_entrypoint.md). + +## Run It + +At this point, you can start Smyth from your project's root directory: + + +
+ python -m smyth + [14:12:00] INFO [MainProcess] Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit) config.py:523 + INFO [MainProcess] Started reloader process [22786] using StatReload basereload.py:79 + [14:12:00] DEBUG [SpawnProcess-1] Using selector: KqueueSelector selector_events.py:64 + INFO [SpawnProcess-1] Started server process [22788] server.py:82 + INFO [SpawnProcess-1] Waiting for application startup. on.py:48 + INFO [SpawnProcess-1] Started process order_handler:0 smyth.py:66 + INFO [SpawnProcess-1] Started process order_handler:1 smyth.py:66 + INFO [SpawnProcess-1] Started process product_handler:0 smyth.py:66 + INFO [SpawnProcess-1] Started process product_handler:1 smyth.py:66 + INFO [SpawnProcess-1] Application startup complete. on.py:62 +
+ + +Visit [http://localhost:8080/orders/](http://localhost:8080/orders/){target="_blank"} to get the Order Handler response. diff --git a/docs/user_guide/invoke.md b/docs/user_guide/invoke.md new file mode 100644 index 0000000..712c624 --- /dev/null +++ b/docs/user_guide/invoke.md @@ -0,0 +1,52 @@ +# Lambda Invoke + +An important aspect when working with Lambdas is the ability to invoke one like a remote function. The [Boto3 `Lambda.Client.invoke` function](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/invoke.html) is one way to run your Lambda code. Smyth recognizes the need to simulate that as well. + +## Example + +```python title="my_project/src/my_app/handlers.py" linenums="1" +import boto3 + +lambda_client = boto3.client( + "lambda", + endpoint_url="http://localhost:8080" # (1)! +) + +def order_handler(event, context): + lambda_client.invoke( + FunctionName="email_handler", # (3)! + InvocationType="Event", # or RequestResponse + Payload=b'{"to": "hello@mirumee.com", "subject": "Order made"}', + ) + return {"statusCode": 200, "body": f"Orders requests: {COUNT}"} + +def email_handler(event, context): + print(event) # (2)! + return {"statusCode": 200, "body": f"Products requests: {COUNT}"} +``` + +1. Set the endpoint URL to your Smyth host and port. +2. The payload being sent to the handler: `{"to": "hello@mirumee.com", "subject": "Order made"}`. +3. Corresponds to the TOML config `[tool.smyth.handlers.email_handler]`. + +## How It Works + +No matter what `url_path` your handler is registered under in your config, every handler is also available via Smyth's `"/2015-03-31/functions/{function:str}/invocations"` URL. The difference from the `url_path` invocation is that when using the "direct invocation," the event generator is hardcoded to the `smyth.event.generate_lambda_invocation_event_data` function. + +In the example above, the config might look like this: + +```toml title="myproject/pyproject.toml" linenums="1" hl_lines="9 11" +[tool.smyth] +host = "0.0.0.0" +port = 8080 + +[tool.smyth.handlers.order_handler] +handler_path = "my_app.handlers.order_handler" +url_path = "/orders/{path:path}" + +[tool.smyth.handlers.email_handler] +handler_path = "my_app.handlers.email_handler" +url_path = "/emails/{path:path}" +``` + +Line 9, which names the handler, is the important one here. Line 11 is required, but you don't have to use the HTTP request method to reach that handler. diff --git a/mkdocs.yml b/mkdocs.yml new file mode 100644 index 0000000..2b5b2cc --- /dev/null +++ b/mkdocs.yml @@ -0,0 +1,79 @@ +site_name: Smyth +site_url: https://mirumee.github.io/smyth/ +repo_url: https://github.com/mirumee/smyth +repo_name: mirumee/smyth +copyright: Copyright © 2024 - Mirumee Software + +theme: + name: material + custom_dir: docs/overrides + palette: + scheme: slate + primary: custom + accent: custom + features: + - toc.follow + - content.code.copy + - content.code.annotate + - content.tabs.link + - navigation.indexes + - navigation.footer + - navigation.tracking + - navigation.expand + - search.suggest + +nav: + - index.md + - User Guide: + - user_guide/index.md + - user_guide/event_generators.md + - user_guide/invoke.md + - user_guide/concurrency.md + - user_guide/all_settings.md + - user_guide/custom_entrypoint.md + +plugins: + - offline: + enabled: !ENV [OFFLINE, false] + - search + - termynal: + prompt_literal_start: + - "$" + - ">" + +extra_css: + - stylesheets/extra.css + +extra_javascript: + - https://unpkg.com/tablesort@5.3.0/dist/tablesort.min.js + - https://unpkg.com/tablesort@5.3.0/dist/sorts/tablesort.number.min.js + - javascripts/tablesort.js + +markdown_extensions: + - admonition + - attr_list + - md_in_html + - pymdownx.details + - pymdownx.highlight: + anchor_linenums: true + line_spans: __span + pygments_lang_class: true + - pymdownx.inlinehilite + - pymdownx.snippets + - pymdownx.emoji: + emoji_index: !!python/name:material.extensions.emoji.twemoji + emoji_generator: !!python/name:material.extensions.emoji.to_svg + - pymdownx.superfences: + custom_fences: + - name: mermaid + class: mermaid + format: !!python/name:pymdownx.superfences.fence_code_format + - pymdownx.tabbed: + alternate_style: true + slugify: !!python/object/apply:pymdownx.slugs.slugify + kwds: + case: lower + - footnotes + - tables + - toc: + permalink: true diff --git a/pyproject.toml b/pyproject.toml index b3b8ffd..92c1eb4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,66 +1,141 @@ -[tool.poetry] +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] name = "smyth" -version = "0.3.1" -description = "" -authors = ["Mirumee "] +version = "0.4.0" +description = '' readme = "README.md" -packages = [{include = "smyth", from = "src"}] -license = "BSD-3-Clause" -repository = "https://github.com/mirumee/smyth" +requires-python = ">=3.10" +license = "MIT" +keywords = [] +authors = [{ name = "Mirumee", email = "it@mirumee.com" }] classifiers = [ - "Programming Language :: Python", - "Development Status :: 3 - Alpha", "Environment :: Console", "Intended Audience :: Developers", + "Development Status :: 4 - Beta", + "Programming Language :: Python", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: Implementation :: CPython", "Topic :: Software Development :: Build Tools", "Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Internet :: WWW/HTTP :: HTTP Servers", "Topic :: Software Development", ] +dependencies = [ + "starlette", + "uvicorn", + "aws-lambda-powertools", + "toml", + "pydantic", + "rich", + "asgiref", + "typer", +] -[tool.poetry.dependencies] -python = "^3.12" -uvicorn = "^0" -starlette = "^0" -aws-lambda-powertools = "^2" -toml = "^0" -pydantic = "^2.6.2" -rich = "^13.7.0" -click = "^8.1.7" -asgiref = "^3.7.2" - -[tool.poetry.group.dev.dependencies] -ipdb = "^0.13.13" -ruff = "^0.2.2" -mypy = "^1.8.0" -types-toml = "^0.10.8.7" +[project.optional-dependencies] +dev = ["ipdb"] +types = ["mypy>=1.0.0", "pytest", "types-toml", "pytest-asyncio"] +docs = ["mkdocs-material", "termynal"] -[build-system] -requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" +[tool.hatch.envs.default] +features = ["dev", "types", "docs"] + +[project.urls] +Documentation = "https://mirumee.github.io/smyth/" +Issues = "https://github.com/mirumee/smyth/issues" +Source = "https://github.com/mirumee/smyth" + +[project.scripts] +smyth = "smyth.__main__:app" + +[tool.hatch.envs.default.scripts] +check = [ + "hatch fmt", + "hatch test -a", + "hatch test --cover", + "hatch run types:check", +] +cov-html = ["hatch test --cover -- --cov-report=html"] + +[tool.hatch.envs.hatch-static-analysis] +config-path = "ruff.toml" + +[tool.hatch.envs.hatch-test] +dependencies = [ + "asynctest", + "ipdb", + "pytest-asyncio", + "pytest-memray", + "pytest-print", + "pytest-cov", +] + +[[tool.hatch.envs.hatch-test.matrix]] +python = ["3.10", "3.11", "3.12"] + +[tool.hatch.envs.types.scripts] +check = "mypy --install-types --non-interactive {args:src/smyth tests}" + +[tool.hatch.envs.docs.scripts] +build = "mkdocs build --clean --strict" +serve = "mkdocs serve --dev-addr localhost:8000" +deploy = "mkdocs gh-deploy --force" + +[tool.hatch.envs.coverage] +detached = true +dependencies = ["coverage[toml]>=6.2"] +[tool.hatch.envs.coverage.scripts] +combine = "coverage combine {args}" +html = "coverage html --skip-covered --skip-empty" + +[tool.hatch.build.targets.wheel] +packages = ["src/smyth"] + +[tool.coverage.run] +source_pkgs = ["smyth"] +branch = true +parallel = true + +[tool.coverage.paths] +smyth = ["src/smyth"] +tests = ["tests"] + +[tool.coverage.report] +exclude_lines = [ + "no cov", + "if __name__ == .__main__.:", + "if TYPE_CHECKING:", + "@abstract", +] +# fail_under = 90 # TODO: Uncomment when coverage is good enough [tool.ruff] -exclude = ["src/graphql_client"] -line-length = 99 -target-version = "py312" - -# rules: https://beta.ruff.rs/docs/rules -# F - pyflakes -# E - pycodestyle -# G - flake8-logging-format -# I - isort -# N - pep8-naming -# Q - flake8-quotes -# UP - pyupgrade -# C90 - mccabe (complexity) -# T20 - flake8-print -# TID - flake8-tidy-imports +line-length = 88 +target-version = "py310" + +[tool.ruff.format] +docstring-code-format = true +docstring-code-line-length = 80 [tool.ruff.lint] select = ["E", "F", "G", "I", "N", "Q", "UP", "C90", "T20", "TID"] +[tool.ruff.lint.flake8-tidy-imports] +ban-relative-imports = "all" + [tool.ruff.lint.mccabe] max-complexity = 10 -[tool.ruff.lint.flake8-tidy-imports] -ban-relative-imports = "all" +[tool.ruff.lint.isort] +known-first-party = ["smyth"] + +[tool.ruff.lint.flake8-pytest-style] +fixture-parentheses = false +mark-parentheses = false + +[tool.pytest.ini_options] +asyncio_mode = "auto" diff --git a/src/smyth/__main__.py b/src/smyth/__main__.py index 13843f7..2674ccf 100644 --- a/src/smyth/__main__.py +++ b/src/smyth/__main__.py @@ -1,15 +1,14 @@ import logging -import logging.config -import os +from typing import Annotated -import click +import typer import uvicorn -from smyth.config import get_config, get_config_dict, serialize_config +from smyth.config import get_config, get_config_dict +app = typer.Typer() config = get_config(get_config_dict()) - logging_config = { "version": 1, "disable_existing_loggers": False, @@ -34,39 +33,49 @@ "datefmt": "[%X]", }, }, - "root": { - "handlers": ["console"], - "level": config.log_level, + "loggers": { + "smyth": { + "handlers": ["console"], + "level": config.log_level, + "propagate": False, + }, + "uvicorn": { + "handlers": ["console"], + "level": config.log_level, + "propagate": False, + }, }, } logging.config.dictConfig(logging_config) LOGGER = logging.getLogger(__name__) -@click.group() -def cli(): - pass - - -@cli.command() -@click.option("-h", "--host", default=None, help=f"Bind socket to this host. [default: {config.host}]", type=str) -@click.option("-p", "--port", default=None, help=f"Bind socket to this port. [default: {config.port}]", type=int) -@click.option("--only", default=None, help="Run only the handler of this name. [list]", multiple=True) -@click.option("--log-level", default=None, help=f"Log level. [default: {config.log_level}]", type=str) -def run(host: str | None, port: int | None, only: list[str] | None, log_level: str | None): - LOGGER.info("Starting [blue bold]Smyth[/]") +@app.command() +def run( + smyth_starlette_app: Annotated[ + str | None, typer.Argument() + ] = None, # typer does not handle union types + smyth_starlette_app_factory: Annotated[ + str, typer.Argument() + ] = "smyth.server.app:create_app", + host: Annotated[str | None, typer.Option()] = config.host, + port: Annotated[int | None, typer.Option()] = config.port, + log_level: Annotated[str | None, typer.Option()] = config.log_level, +): + if smyth_starlette_app and smyth_starlette_app_factory: + raise typer.BadParameter( + "Only one of smyth_starlette_app or smyth_starlette_app_factory " + "should be provided." + ) - if only: - config.handlers = { - handler_name: handler_config - for handler_name, handler_config in config.handlers.items() - if handler_name in only - } - LOGGER.info( - "[yellow][bold]Only[/bold] running handlers:[/yellow] [blue]%s[/blue]", - ", ".join( - [handler for handler in config.handlers.keys()] - ) + factory = False + if smyth_starlette_app_factory: + smyth_starlette_app = smyth_starlette_app_factory + factory = True + if not smyth_starlette_app: + raise typer.BadParameter( + "One of smyth_starlette_app or smyth_starlette_app_factory " + "should be provided." ) if host: @@ -76,17 +85,17 @@ def run(host: str | None, port: int | None, only: list[str] | None, log_level: s if log_level: config.log_level = log_level - os.environ["_SMYTH_CONFIG"] = serialize_config(config) - uvicorn.run( - "smyth.app:app", + smyth_starlette_app, + factory=factory, host=config.host, port=config.port, reload=True, log_config=logging_config, timeout_keep_alive=60 * 15, + lifespan="on", ) if __name__ == "__main__": - cli() + app() diff --git a/src/smyth/app.py b/src/smyth/app.py deleted file mode 100644 index 73644c4..0000000 --- a/src/smyth/app.py +++ /dev/null @@ -1,40 +0,0 @@ -import logging -import os -from contextlib import asynccontextmanager -from multiprocessing import set_start_method - -from starlette.applications import Starlette - -from smyth.config import deserialize_config -from smyth.dispatcher.dispatcher import Dispatcher -from smyth.endpoints import lambda_invoker_endpoint, status_endpoint - -LOGGER = logging.getLogger(__name__) -set_start_method("spawn", force=True) - - -@asynccontextmanager -async def lifespan(app: "SmythApp"): - config = deserialize_config(os.environ["_SMYTH_CONFIG"]) - with Dispatcher( - config=config - ) as dispatcher: - app.dispatcher = dispatcher - yield - - -class SmythApp(Starlette): - dispatcher: Dispatcher - - def __init__(self, *args, **kwargs): - config = deserialize_config(os.environ["_SMYTH_CONFIG"]) - super().__init__(debug=True, lifespan=lifespan, *args, **kwargs) - self.add_route(f"{config.smyth_path_prefix}/api/status", status_endpoint, methods=["GET"]) - self.add_route( - "/{path:path}", - lambda_invoker_endpoint, - methods=["GET", "POST", "PUT", "DELETE"], - ) - - -app = SmythApp() diff --git a/src/smyth/config.py b/src/smyth/config.py index 3d1df96..efd79fb 100644 --- a/src/smyth/config.py +++ b/src/smyth/config.py @@ -1,5 +1,4 @@ -import json -from dataclasses import asdict, dataclass, field +from dataclasses import dataclass, field from pathlib import Path import toml @@ -12,12 +11,12 @@ class HandlerConfig: handler_path: str url_path: str timeout: float | None = None - event_data_generator_path: str = "smyth.event.generate_event_data" + event_data_generator_path: str = "smyth.event.generate_api_gw_v2_event_data" context_data_generator_path: str = "smyth.context.generate_context_data" - fake_coldstart_time: bool = False + fake_coldstart: bool = False log_level: str = "INFO" concurrency: int = 1 - dispatch_strategy_path: str = "smyth.dispatcher.strategy.RoundRobinDispatchStrategy" + strategy_function_path: str = "smyth.runner.strategy.first_warm" @dataclass @@ -58,13 +57,3 @@ def get_config_dict(config_file_name: str | None = None) -> dict: def get_config(config_dict: dict) -> Config: """Get config.""" return Config(**config_dict["tool"]["smyth"]) - - -def serialize_config(config: "Config") -> str: - """Serialize config.""" - return json.dumps(asdict(config)) - - -def deserialize_config(config_str: str) -> Config: - """Deserialize config.""" - return Config(**json.loads(config_str)) diff --git a/src/smyth/context.py b/src/smyth/context.py index 0540ba0..1836456 100644 --- a/src/smyth/context.py +++ b/src/smyth/context.py @@ -3,17 +3,17 @@ from starlette.requests import Request -from smyth.dispatcher.process import LambdaProcess -from smyth.dispatcher.type import ProcessDefinition +from smyth.types import RunnerProcessProtocol, SmythHandler async def generate_context_data( - request: Request, process_def: ProcessDefinition, process: LambdaProcess + request: Request, handler: SmythHandler, process: RunnerProcessProtocol ): """ The data returned by this function is passed to the `smyth.runner.FaneContext` as kwargs. """ + asdict(handler) context: dict[str, Any] = { "smyth": { "process": { @@ -22,12 +22,12 @@ async def generate_context_data( "task_counter": process.task_counter, "last_used_timestamp": process.last_used_timestamp, }, - "process_def": { - "name": process_def.name, - "handler_config": asdict(process_def.handler_config), + "handler": { + "name": handler.name, + "handler_config": asdict(handler), }, } } - if process_def.handler_config.timeout is not None: - context["timeout"] = process_def.handler_config.timeout + if handler.timeout is not None: + context["timeout"] = handler.timeout return context diff --git a/src/smyth/dispatcher/__init__.py b/src/smyth/dispatcher/__init__.py deleted file mode 100644 index 8b13789..0000000 --- a/src/smyth/dispatcher/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/smyth/dispatcher/dispatcher.py b/src/smyth/dispatcher/dispatcher.py deleted file mode 100644 index b1d72df..0000000 --- a/src/smyth/dispatcher/dispatcher.py +++ /dev/null @@ -1,106 +0,0 @@ -import json -import logging - -from starlette.requests import Request -from starlette.routing import compile_path - -from smyth.config import Config -from smyth.dispatcher.exceptions import ( - NoAvailableProcessError, - ProcessDefinitionNotFoundError, -) -from smyth.dispatcher.process import LambdaProcess -from smyth.dispatcher.type import ProcessDefinition -from smyth.utils import import_attribute - -LOGGER = logging.getLogger(__name__) - - -class Dispatcher: - def __init__(self, config: Config): - self.config = config - self.process_definitions = self.get_process_definitions(config) - self.process_groups = self.build_process_groups(self.process_definitions) - - async def __call__(self, request: Request): - return self.dispatch(request) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.stop() - - def stop(self): - for process_group in self.process_groups.values(): - for process in process_group: - process.stop() - if process.is_alive(): - process.terminate() - for process_group in self.process_groups.values(): - for process in process_group: - if process.is_alive(): - process.join() - - def get_process_definitions(self, config: Config) -> dict[str, ProcessDefinition]: - return { - handler_config.url_path: ProcessDefinition( - name=handler_name, - handler_config=handler_config, - event_data_generator=import_attribute( - handler_config.event_data_generator_path - ), - context_data_generator=import_attribute( - handler_config.context_data_generator_path - ), - url_path=compile_path(handler_config.url_path)[0], - dispatch_strategy_class=import_attribute( - handler_config.dispatch_strategy_path - ), - ) - for handler_name, handler_config in config.handlers.items() - } - - def build_process_groups( - self, process_definitions: dict[str, ProcessDefinition] - ) -> dict[str, list[LambdaProcess]]: - process_groups: dict[str, list[LambdaProcess]] = {} - for process_definition in process_definitions.values(): - process_groups[process_definition.name] = [] - for index in range(process_definition.handler_config.concurrency): - process = LambdaProcess( - name=f"{process_definition.name}:{index}", - handler_config=process_definition.handler_config, - ) - process_groups[process_definition.name].append(process) - return process_groups - - def get_process_definition(self, path: str) -> ProcessDefinition: - for process_def in self.process_definitions.values(): - if process_def.url_path.match(path): - return process_def - raise ProcessDefinitionNotFoundError( - f"No process definition found for path {path}" - ) - - async def dispatch(self, request: Request): - LOGGER.debug("Dispatching request %s", request.url.path) - process_def = self.get_process_definition(request.url.path) - process = process_def.dispatch_strategy_class( - process_groups=self.process_groups - ).get_process(process_def.name) - if not process: - raise NoAvailableProcessError( - f"No available process for {process_def.name}" - ) - if process.state == "COLD": - await process.astart() - - event_data = await process_def.event_data_generator(request) - context_data = await process_def.context_data_generator( - request, process_def, process - ) - - return await process.asend( - json.dumps({"event": event_data, "context": context_data}) - ) diff --git a/src/smyth/dispatcher/exceptions.py b/src/smyth/dispatcher/exceptions.py deleted file mode 100644 index c3a0abe..0000000 --- a/src/smyth/dispatcher/exceptions.py +++ /dev/null @@ -1,18 +0,0 @@ -class DispatcherError(Exception): - pass - - -class ProcessDefinitionNotFoundError(DispatcherError): - pass - - -class NoAvailableProcessError(DispatcherError): - pass - - -class DestroyedOnLoadError(DispatcherError): - pass - - -class LambdaTimeoutError(DispatcherError): - pass diff --git a/src/smyth/dispatcher/process.py b/src/smyth/dispatcher/process.py deleted file mode 100644 index 33258bd..0000000 --- a/src/smyth/dispatcher/process.py +++ /dev/null @@ -1,123 +0,0 @@ -import logging -from enum import Enum -from multiprocessing import Event, Process, Queue -from multiprocessing.synchronize import Event as EventClass -from queue import Empty -from time import time - -from asgiref.sync import sync_to_async - -from smyth.config import HandlerConfig -from smyth.dispatcher.exceptions import DestroyedOnLoadError -from smyth.dispatcher.runner import lambda_runner -from smyth.schema import RunnerResult - -LOGGER = logging.getLogger(__name__) - - -class ProcessState(str, Enum): - COLD = "COLD" - WARM = "WARM" - WORKING = "WORKING" - DESTROYED = "DESTROYED" - - -class LambdaProcess(Process): - name: str - task_counter: int - last_used_timestamp: float - handler_config: HandlerConfig - input_queue: Queue - output_queue: Queue - destruction_event: EventClass - is_loaded: EventClass - - def __init__(self, name, handler_config: HandlerConfig): - self.name = name - self.input_queue = Queue(maxsize=1) - self.output_queue = Queue(maxsize=1) - self.task_counter = 0 - self.last_used_timestamp = 0 - self.handler_config = handler_config - self.destruction_event = Event() - self.is_loaded = Event() - self.is_working = Event() - - super().__init__( - target=lambda_runner, - name=name, - kwargs={ - "name": name, - "handler_config": handler_config, - "input_queue": self.input_queue, - "output_queue": self.output_queue, - "destruction_event": self.destruction_event, - "is_loaded": self.is_loaded, - "is_working": self.is_working, - }, - ) - - @property - def state(self) -> ProcessState: - if self.is_working.is_set(): - return ProcessState.WORKING - elif self.is_loaded.is_set(): - return ProcessState.WARM - return ProcessState.COLD - - def run(self): - LOGGER.info("Process %s started", self.name) - try: - super().run() - except Exception: - LOGGER.exception("Error in process %s", self.name) - self.destruction_event.set() - self.is_loaded.clear() - raise - - def start(self) -> None: - super().start() - time_start = time() - for _ in range(5): - if self.destruction_event.is_set(): - raise DestroyedOnLoadError(f"Process '{self.name}' was destroyed during start.") - if self.is_loaded.is_set(): - break - LOGGER.debug("Waiting for process %s to load", self.name) - self.is_loaded.wait(1) - - LOGGER.info("Process %s loaded in %s seconds", self.name, time() - time_start) - - @sync_to_async(thread_sensitive=False) - def astart(self) -> None: - self.start() - - def stop(self) -> None: - self.destruction_event.set() - - def send(self, data) -> RunnerResult | None: - LOGGER.info("Sending data to process %s: %s", self.name, data) - self.task_counter += 1 - self.last_used_timestamp = time() - self.input_queue.put(data) - - while not self.destruction_event.is_set(): - LOGGER.debug("SHOULD SELF DESTRUCT?: %s", self.destruction_event.is_set()) - try: - result_data = self.output_queue.get(timeout=1) - except Empty: - LOGGER.debug("No data received from process %s, waiting...", self.name) - continue - except KeyboardInterrupt: - self.destruction_event.set() - return None - if result_data: - LOGGER.info("Received data from process %s: %s", self.name, result_data) - return RunnerResult.model_validate_json(result_data) - else: - LOGGER.info("Process %s is destroyed", self.name) - return None - - @sync_to_async(thread_sensitive=False) - def asend(self, data) -> RunnerResult | None: - return self.send(data) diff --git a/src/smyth/dispatcher/runner.py b/src/smyth/dispatcher/runner.py deleted file mode 100644 index 2be0445..0000000 --- a/src/smyth/dispatcher/runner.py +++ /dev/null @@ -1,189 +0,0 @@ -import json -import logging -import signal -import sys -import traceback -from logging.config import dictConfig -from multiprocessing import Queue -from queue import Empty -from multiprocessing.synchronize import Event as EventClass -from random import randint -from time import sleep, strftime, time - -from aws_lambda_powertools.utilities.typing import LambdaContext - -from smyth.config import HandlerConfig -from smyth.dispatcher.exceptions import LambdaTimeoutError -from smyth.schema import ( - LambdaExceptionResponse, - LambdaHttpResponse, - RunnerResult, - RunnerResultType, -) -from smyth.utils import import_attribute - -dictConfig( - { - "version": 1, - "disable_existing_loggers": False, - "handlers": { - "console": { - "class": "rich.logging.RichHandler", - "formatter": "default", - "markup": True, - "rich_tracebacks": True, - }, - }, - "formatters": { - "default": { - "format": "[[bold red]%(processName)s[/]] %(message)s", - "datefmt": "[%X]", - }, - }, - "loggers": { - "smyth": { - "level": "NOTSET", - "propagate": False, - "handlers": ["console"], - }, - }, - } -) - - -LOGGER = logging.getLogger(__name__) - - -class FakeLambdaContext(LambdaContext): - def __init__(self, name="Fake", version="LATEST", timeout=6, **kwargs): - self.name = name - self.version = version - self.created = time() - self.timeout = timeout - for key, value in kwargs.items(): - setattr(self, key, value) - - def get_remaining_time_in_millis(self): - return int( - max( - (self.timeout * 1000) - - (int(round(time() * 1000)) - int(round(self.created * 1000))), - 0, - ) - ) - - @property - def function_name(self): - return self.name - - @property - def function_version(self): - return self.version - - @property - def invoked_function_arn(self): - return "arn:aws:lambda:serverless:" + self.name - - @property - def memory_limit_in_mb(self): - return "1024" - - @property - def aws_request_id(self): - return "1234567890" - - @property - def log_group_name(self): - return "/aws/lambda/" + self.name - - @property - def log_stream_name(self): - return ( - strftime("%Y/%m/%d") - + "/[$" - + self.version - + "]58419525dade4d17a495dceeeed44708" - ) - - @property - def log(self): - return sys.stdout.write - - -def timeout_handler(signum, frame): - """Raise an exception when the lambda timeout is reached. - This will be raised from within the lambda_handler function. - """ - raise LambdaTimeoutError("Lambda timeout") - - -def lambda_runner( - name: str, - handler_config: HandlerConfig, - input_queue: Queue, - output_queue: Queue, - destruction_event: EventClass, - is_loaded: EventClass, - is_working: EventClass, -): - LOGGER.setLevel(handler_config.log_level) - sys.stdin = open("/dev/stdin") - try: - lambda_handler = import_attribute(handler_config.handler_path) - except Exception as error: - LOGGER.error("Could not import lambda handler: %s", error) - raise - - if handler_config.fake_coldstart_time: - LOGGER.info("Faking cold start time") - sleep(randint(800, 1200) / 1000) - - is_loaded.set() - - while not destruction_event.is_set(): - try: - line = input_queue.get(block=True, timeout=1) - except Empty: - continue - except KeyboardInterrupt: - LOGGER.info("Received KeyboardInterrupt, exiting") - break - is_working.set() - LOGGER.debug("Received line: %s", line) - input_data = json.loads(line) - - context = FakeLambdaContext(**input_data.get("context", {})) - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(int(context.timeout)) - - try: - result = lambda_handler(input_data["event"], context) - except LambdaTimeoutError as e: - raise - except Exception as e: - result = RunnerResult( - type=RunnerResultType.EXCEPTION, - response=LambdaExceptionResponse( - message=str(e), - type=type(e).__name__, - stack_trace=traceback.format_exc(), - ), - ) - else: - signal.alarm(0) - result = RunnerResult( - type=RunnerResultType.HTTP, - response=LambdaHttpResponse( - status_code=result["statusCode"], # type: ignore[call-arg] - body=result.get("body", ""), - headers=result.get("headers", {}), - is_base64_encoded=result.get("isBase64Encoded", False), # type: ignore[call-arg] - ), - ) - - LOGGER.debug("Got result from lambda handler: %s", line) - - output_queue.put(result.model_dump_json(by_alias=True)) - is_working.clear() - else: - LOGGER.info("LamdaProcess '%s' exiting", name) diff --git a/src/smyth/dispatcher/strategy.py b/src/smyth/dispatcher/strategy.py deleted file mode 100644 index 6ff461f..0000000 --- a/src/smyth/dispatcher/strategy.py +++ /dev/null @@ -1,58 +0,0 @@ -import logging -from abc import ABC, abstractmethod - -from smyth.dispatcher.process import LambdaProcess - -LOGGER = logging.getLogger(__name__) - - -class BaseDispatchStrategy(ABC): - name: str - - def __init__(self, process_groups: dict[str, list[LambdaProcess]]): - self.process_groups = process_groups - - @abstractmethod - def get_process(self, process_definition_name: str) -> LambdaProcess | None: - raise NotImplementedError("get_process method must be implemented") - - -class FirstWarmDispatchStrategy(BaseDispatchStrategy): - """This strategy prioritizes the use of the first available Lambda Process - in a "warm" state to handle incoming requests. If no warm instances are - available, it initiates a "cold start". This behavior more closely mimics - the operational dynamics of AWS Lambda, where reusing warm instances can - lead to faster response times.""" - - name: str = "first_warm" - - def get_process(self, process_definition_name: str) -> LambdaProcess | None: - # Look for an available, warm process - # If no warm process is available, look for a cold process and start it - best_candidate = None - for process in self.process_groups[process_definition_name]: - if process.state == "WARM": - # If we find a warm process, return it immediately - return process - if process.state == "COLD" and not best_candidate: - # If we find a cold process, store it as a candidate and - # look for another warm process - best_candidate = process - - return best_candidate - - -class RoundRobinDispatchStrategy(BaseDispatchStrategy): - """This strategy, not typical for AWS Lambda's behavior, is beneficial - during development. It rotates among Lambda Processes for each request, - given that concurrency is set higher than `1`. This approach encourages - developers to avoid relying on global state across requests, promoting - best practices in serverless application design.""" - - name: str = "round_robin" - - def get_process(self, process_definition_name: str) -> LambdaProcess | None: - return sorted( - self.process_groups[process_definition_name], - key=lambda process: process.last_used_timestamp, - )[0] diff --git a/src/smyth/dispatcher/type.py b/src/smyth/dispatcher/type.py deleted file mode 100644 index 71d6c28..0000000 --- a/src/smyth/dispatcher/type.py +++ /dev/null @@ -1,22 +0,0 @@ -from collections.abc import Awaitable, Callable -from dataclasses import dataclass -from re import Pattern -from typing import Any - -from starlette.requests import Request - -from smyth.config import HandlerConfig -from smyth.dispatcher.process import LambdaProcess -from smyth.dispatcher.strategy import BaseDispatchStrategy - - -@dataclass -class ProcessDefinition: - name: str - handler_config: HandlerConfig - event_data_generator: Callable[[Request], Awaitable[dict[str, Any]]] - context_data_generator: Callable[ - [Request, "ProcessDefinition", LambdaProcess], Awaitable[dict[str, Any]] - ] - url_path: Pattern[str] - dispatch_strategy_class: type[BaseDispatchStrategy] diff --git a/src/smyth/endpoints/__init__.py b/src/smyth/endpoints/__init__.py deleted file mode 100644 index 9eaa08c..0000000 --- a/src/smyth/endpoints/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from smyth.endpoints.invoker import lambda_invoker_endpoint -from smyth.endpoints.status import status_endpoint - -__all__ = ["lambda_invoker_endpoint", "status_endpoint"] diff --git a/src/smyth/endpoints/invoker.py b/src/smyth/endpoints/invoker.py deleted file mode 100644 index a7b1ccf..0000000 --- a/src/smyth/endpoints/invoker.py +++ /dev/null @@ -1,39 +0,0 @@ -import logging - -from starlette import status -from starlette.requests import Request -from starlette.responses import JSONResponse, Response - -from smyth.dispatcher.dispatcher import Dispatcher -from smyth.dispatcher.exceptions import DestroyedOnLoadError, LambdaTimeoutError -from smyth.schema import ( - LambdaExceptionResponse, - LambdaHttpResponse, -) - -LOGGER = logging.getLogger(__name__) - - -async def lambda_invoker_endpoint(request: Request): - dispatcher: Dispatcher = request.app.dispatcher - try: - result = await dispatcher.dispatch(request) - except DestroyedOnLoadError: - return Response("Process destroyed on load", status_code=status.HTTP_502_BAD_GATEWAY) - except LambdaTimeoutError: - return Response("Lambda timeout", status_code=status.HTTP_408_REQUEST_TIMEOUT) - - if not result: - return Response("No response", status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) - - if isinstance(result.response, LambdaExceptionResponse): - return JSONResponse( - content=result.response.model_dump(), - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) - elif isinstance(result.response, LambdaHttpResponse): - return Response( - content=result.response.body, - status_code=result.response.status_code, - headers=result.response.headers, - ) diff --git a/src/smyth/endpoints/status.py b/src/smyth/endpoints/status.py deleted file mode 100644 index aace402..0000000 --- a/src/smyth/endpoints/status.py +++ /dev/null @@ -1,32 +0,0 @@ -from dataclasses import asdict - -from starlette import status -from starlette.requests import Request -from starlette.responses import JSONResponse - -from smyth.dispatcher.dispatcher import Dispatcher - - -async def status_endpoint(request: Request): - dispatcher: Dispatcher = request.app.dispatcher - - response_data = { - "lambda handlers": {}, - "config": asdict(dispatcher.config), - } - - for process_group_name, process_group in dispatcher.process_groups.items(): - response_data["lambda handlers"][process_group_name] = { # type: ignore[index] - "processes": [], - } - for process in process_group: - response_data["lambda handlers"][process_group_name]["processes"].append( # type: ignore[index] - { - "state": process.state, - "task_counter": process.task_counter, - } - ) - return JSONResponse( - content=response_data, - status_code=status.HTTP_200_OK, - ) diff --git a/src/smyth/event.py b/src/smyth/event.py index 2151f96..8c75980 100644 --- a/src/smyth/event.py +++ b/src/smyth/event.py @@ -1,7 +1,7 @@ from starlette.requests import Request -async def generate_event_data(request: Request): +async def generate_api_gw_v2_event_data(request: Request): source_ip = None if request.client: source_ip = request.client.host @@ -26,3 +26,7 @@ async def generate_event_data(request: Request): "routeKey": f"{request.method} {request.url.path}", "rawQueryString": request.url.query, } + + +async def generate_lambda_invokation_event_data(request: Request): + return await request.json() diff --git a/src/smyth/exceptions.py b/src/smyth/exceptions.py index d3aa029..c879e11 100644 --- a/src/smyth/exceptions.py +++ b/src/smyth/exceptions.py @@ -4,3 +4,27 @@ class LambdaRuntimeError(Exception): class ConfigFileNotFoundError(LambdaRuntimeError): """Config file not found.""" + + +class DispatcherError(Exception): + pass + + +class ProcessDefinitionNotFoundError(DispatcherError): + pass + + +class NoAvailableProcessError(DispatcherError): + pass + + +class DestroyedOnLoadError(DispatcherError): + pass + + +class LambdaTimeoutError(DispatcherError): + pass + + +class LambdaInvocationError(DispatcherError): + pass diff --git a/src/smyth/process.py b/src/smyth/process.py deleted file mode 100644 index 5041e25..0000000 --- a/src/smyth/process.py +++ /dev/null @@ -1,87 +0,0 @@ -import logging -from collections.abc import Awaitable, Callable -from dataclasses import dataclass -from multiprocessing import Process, Queue -from queue import Empty -from re import Pattern -from typing import Any - -from starlette.requests import Request -from starlette.routing import compile_path - -from smyth.config import Config -from smyth.runner import main -from smyth.schema import RunnerResult -from smyth.utils import import_attribute - -LOGGER = logging.getLogger(__name__) - - -@dataclass -class ProcessDefinition: - process: "LambdaProcess" - event_data_generator: Callable[[Request], Awaitable[dict[str, Any]]] - context_data_generator: Callable[[Request, float | None], Awaitable[dict[str, Any]]] - timeout: float | None - url_path: Pattern[str] - - -def get_process_definitions(config: Config) -> list[ProcessDefinition]: - return [ - ProcessDefinition( - process=LambdaProcess( - target=main, - name=handler_name, - args=(handler_config,), - ), - event_data_generator=import_attribute( - handler_config.event_data_generator_path - ), - context_data_generator=import_attribute( - handler_config.context_data_generator_path - ), - timeout=handler_config.timeout, - url_path=compile_path(handler_config.url_path)[0], - ) - for handler_name, handler_config in config.handlers.items() - ] - - -class LambdaProcess(Process): - - def __init__(self, target, name, args=None, kwargs=None): - if not args: - args = () - if not kwargs: - kwargs = {} - self.input_queue = Queue(maxsize=1) - self.output_queue = Queue(maxsize=1) - kwargs["input_queue"] = self.input_queue - kwargs["output_queue"] = self.output_queue - super().__init__(target=target, name=name, args=args, kwargs=kwargs) - - def run(self): - LOGGER.info("Starting process %s", self.name) - super().run() - - def send(self, data) -> RunnerResult | None: - LOGGER.info("Sending data to process %s: %s", self.name, data) - if not self.is_alive(): - raise RuntimeError(f"Process '{self.name}' is not alive, restart server.") - self.input_queue.put(data) - try: - while True: - result_data = self.output_queue.get(timeout=10) - if not result_data: - LOGGER.info("Received empty data from process %s, checking if process is alive", self.name) - if not self.is_alive(): - LOGGER.info("Process %s is not alive, breaking", self.name) - break - continue - LOGGER.info("Received data from process %s: %s", self.name, result_data) - return RunnerResult.model_validate_json(result_data) - except Empty: - return None - except KeyboardInterrupt: - pass - return None diff --git a/src/smyth/runner.py b/src/smyth/runner.py deleted file mode 100644 index 35c9a22..0000000 --- a/src/smyth/runner.py +++ /dev/null @@ -1,166 +0,0 @@ -import json -import logging -import signal -import sys -import traceback -from logging.config import dictConfig -from multiprocessing import Queue -from random import randint -from time import sleep, strftime, time - -from aws_lambda_powertools.utilities.typing import LambdaContext - -from smyth.config import HandlerConfig -from smyth.schema import ( - LambdaExceptionResponse, - LambdaHttpResponse, - RunnerResult, - RunnerResultType, -) -from smyth.utils import import_attribute - -dictConfig({ - "version": 1, - "disable_existing_loggers": False, - "handlers": { - "console": { - "class": "rich.logging.RichHandler", - "formatter": "default", - "markup": True, - }, - }, - "formatters": { - "default": { - "format": "[[bold red]%(processName)s[/]] %(message)s", - "datefmt": "[%X]", - }, - }, - "loggers": { - "smyth": { - "level": "NOTSET", - "propagate": False, - "handlers": ["console"], - }, - }, -}) - - -LOGGER = logging.getLogger(__name__) - - -class FakeLambdaContext(LambdaContext): - def __init__(self, name="Fake", version="LATEST", timeout=6, **kwargs): - self.name = name - self.version = version - self.created = time() - self.timeout = timeout - for key, value in kwargs.items(): - setattr(self, key, value) - - def get_remaining_time_in_millis(self): - return int( - max( - (self.timeout * 1000) - - (int(round(time() * 1000)) - int(round(self.created * 1000))), - 0, - ) - ) - - @property - def function_name(self): - return self.name - - @property - def function_version(self): - return self.version - - @property - def invoked_function_arn(self): - return "arn:aws:lambda:serverless:" + self.name - - @property - def memory_limit_in_mb(self): - return "1024" - - @property - def aws_request_id(self): - return "1234567890" - - @property - def log_group_name(self): - return "/aws/lambda/" + self.name - - @property - def log_stream_name(self): - return ( - strftime("%Y/%m/%d") - + "/[$" - + self.version - + "]58419525dade4d17a495dceeeed44708" - ) - - @property - def log(self): - return sys.stdout.write - - -def timeout_handler(signum, frame): - raise Exception("Lambda timeout") - - -def main(lambda_handler_config: HandlerConfig, input_queue: Queue, output_queue: Queue): - LOGGER.setLevel(lambda_handler_config.log_level) - sys.stdin = open("/dev/stdin") - try: - lambda_handler = import_attribute(lambda_handler_config.handler_path) - except ImportError as error: - LOGGER.error("Could not import lambda handler: %s", error) - raise - - coldstart_next_invokation = lambda_handler_config.fake_coldstart_time - - while True: - try: - line = input_queue.get(block=True, timeout=None) - except KeyboardInterrupt: - LOGGER.info("Received KeyboardInterrupt, exiting") - break - - if coldstart_next_invokation: - LOGGER.info("Faking cold start time") - sleep(randint(500, 1000) / 1000) - coldstart_next_invokation = False - - LOGGER.debug("Received line: %s", line) - input_data = json.loads(line) - - context = FakeLambdaContext(**input_data.get("context", {})) - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(int(context.timeout)) - - try: - result = lambda_handler(input_data["event"], context) - except Exception as e: - result = RunnerResult( - type=RunnerResultType.EXCEPTION, - response=LambdaExceptionResponse( - message=str(e), - type=type(e).__name__, - stack_trace=traceback.format_exc(), - ), - ) - else: - signal.alarm(0) - result = RunnerResult( - type=RunnerResultType.HTTP, - response=LambdaHttpResponse( - status_code=result["statusCode"], # type: ignore[call-arg] - body=result.get("body", ""), - headers=result.get("headers", {}), - is_base64_encoded=result.get("isBase64Encoded", False), # type: ignore[call-arg] - ), - ) - - LOGGER.debug("Got result from lambda handler: %s", line) - - output_queue.put(result.model_dump_json(by_alias=True)) diff --git a/src/smyth/runner/__init__.py b/src/smyth/runner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/smyth/runner/fake_context.py b/src/smyth/runner/fake_context.py new file mode 100644 index 0000000..90a26a4 --- /dev/null +++ b/src/smyth/runner/fake_context.py @@ -0,0 +1,60 @@ +import sys +from time import strftime, time + +from aws_lambda_powertools.utilities.typing import LambdaContext + + +class FakeLambdaContext(LambdaContext): + def __init__(self, name="Fake", version="LATEST", timeout=6, **kwargs): + self.name = name + self.version = version + self.created = time() + self.timeout = timeout + for key, value in kwargs.items(): + setattr(self, key, value) + + def get_remaining_time_in_millis(self): + return int( + max( + (self.timeout * 1000) + - (int(round(time() * 1000)) - int(round(self.created * 1000))), + 0, + ) + ) + + @property + def function_name(self): + return self.name + + @property + def function_version(self): + return self.version + + @property + def invoked_function_arn(self): + return "arn:aws:lambda:serverless:" + self.name + + @property + def memory_limit_in_mb(self): + return "1024" + + @property + def aws_request_id(self): + return "1234567890" + + @property + def log_group_name(self): + return "/aws/lambda/" + self.name + + @property + def log_stream_name(self): + return ( + strftime("%Y/%m/%d") + + "/[$" + + self.version + + "]58419525dade4d17a495dceeeed44708" + ) + + @property + def log(self): + return sys.stdout.write diff --git a/src/smyth/runner/process.py b/src/smyth/runner/process.py new file mode 100644 index 0000000..f30ae27 --- /dev/null +++ b/src/smyth/runner/process.py @@ -0,0 +1,76 @@ +import logging +from multiprocessing import Process, Queue +from queue import Empty +from time import time + +from asgiref.sync import sync_to_async + +from smyth.exceptions import LambdaInvocationError +from smyth.runner.runner import lambda_invoker +from smyth.types import LambdaHandler, RunnerMessage, SmythHandlerState + +LOGGER = logging.getLogger(__name__) + + +class RunnerProcess(Process): + name: str + task_counter: int + last_used_timestamp: float + state: SmythHandlerState + + def __init__( + self, name: str, lambda_handler: LambdaHandler, log_level: str = "INFO" + ): + self.name = name + self.task_counter = 0 + self.last_used_timestamp = 0 + self.state = SmythHandlerState.COLD + + self.input_queue: Queue[RunnerMessage] = Queue(maxsize=1) + self.output_queue: Queue[RunnerMessage] = Queue(maxsize=1) + super().__init__( + target=lambda_invoker, + name=name, + kwargs={ + "lambda_handler": lambda_handler, + "input_queue": self.input_queue, + "output_queue": self.output_queue, + "log_level": log_level, + }, + ) + + def stop(self): + self.input_queue.close() + self.output_queue.close() + self.input_queue.join_thread() + self.output_queue.join_thread() + self.terminate() + self.join() + + def send(self, data) -> RunnerMessage | None: + LOGGER.debug("Sending data to process %s: %s", self.name, data) + self.task_counter += 1 + self.last_used_timestamp = time() + self.input_queue.put(data) + + while True: + try: + message = self.output_queue.get(block=True, timeout=1) + except Empty: + continue + except Exception as error: + LOGGER.error("Error getting message from output queue: %s", error) + return None + + LOGGER.debug("Received message from process %s: %s", self.name, message) + if message["type"] == "smyth.lambda.status": + self.state = SmythHandlerState(message["status"]) + elif message["type"] == "smyth.lambda.response": + return message["response"] + elif message["type"] == "smyth.lambda.error": + LOGGER.error("Error invoking lambda: %s", message) + raise LambdaInvocationError(message["response"]["message"]) + + @sync_to_async(thread_sensitive=False) + def asend(self, data) -> RunnerMessage | None: + return self.send(data) diff --git a/src/smyth/runner/runner.py b/src/smyth/runner/runner.py new file mode 100644 index 0000000..826dd03 --- /dev/null +++ b/src/smyth/runner/runner.py @@ -0,0 +1,116 @@ +import logging +import signal +import sys +import traceback +from logging.config import dictConfig +from multiprocessing import Queue +from queue import Empty +from random import randint +from time import sleep + +from smyth.runner.fake_context import FakeLambdaContext +from smyth.types import LambdaHandler + + +def configure_logging(log_level: str): + dictConfig( + { + "version": 1, + "disable_existing_loggers": False, + "handlers": { + "console": { + "class": "rich.logging.RichHandler", + "formatter": "default", + "markup": True, + "rich_tracebacks": True, + }, + }, + "formatters": { + "default": { + "format": "[[bold red]%(processName)s[/]] %(message)s", + "datefmt": "[%X]", + }, + }, + "loggers": { + "smyth": { + "level": log_level, + "propagate": False, + "handlers": ["console"], + }, + }, + } + ) + + +LOGGER = logging.getLogger(__name__) + + +def timeout_handler(signum, frame): + raise Exception("Lambda timeout") + + +def lambda_invoker( + lambda_handler: LambdaHandler, + input_queue: Queue, + output_queue: Queue, + log_level: str, +): + configure_logging(log_level=log_level) + sys.stdin = open("/dev/stdin") + + already_faked_coldstart = False + + while True: + try: + message = input_queue.get(block=True, timeout=1) + except KeyboardInterrupt: + LOGGER.debug("Stopping process") + sys.stdin.close() + break + except Empty: + continue + + LOGGER.debug("Received message: %s", message) + + if message["type"] == "smyth.stop": + LOGGER.debug("Stopping process") + break + + if message.get("type") != "smyth.lambda.invoke": + LOGGER.error("Invalid message type: %s", message.get("type")) + continue + + event, context = message["event"], FakeLambdaContext(**message["context"]) + + if ( + context.smyth["handler"]["handler_config"]["fake_coldstart"] # type: ignore[attr-defined] + and not already_faked_coldstart + ): + LOGGER.info("Faking cold start time") + sleep(randint(500, 1000) / 1000) + already_faked_coldstart = True + + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(int(context.timeout)) + output_queue.put({"type": "smyth.lambda.status", "status": "working"}) + try: + response = lambda_handler(event, context) + except Exception as error: + LOGGER.error("Error invoking lambda: %s", error) + result = { + "type": "smyth.lambda.error", + "response": { + "type": type(error).__name__, + "message": str(error), + "stacktrace": traceback.format_exc(), + }, + } + else: + result = { + "type": "smyth.lambda.response", + "response": response, + } + finally: + signal.alarm(0) + output_queue.put({"type": "smyth.lambda.status", "status": "warm"}) + output_queue.put(result) diff --git a/src/smyth/runner/strategy.py b/src/smyth/runner/strategy.py new file mode 100644 index 0000000..e75c719 --- /dev/null +++ b/src/smyth/runner/strategy.py @@ -0,0 +1,36 @@ +from smyth.types import RunnerProcessProtocol, SmythHandlerState + + +def round_robin( + handler_name: str, processes: dict[str, list[RunnerProcessProtocol]] +) -> RunnerProcessProtocol: + """This strategy, not typical for AWS Lambda's behavior, is beneficial + during development. It rotates among Lambda Processes for each request, + given that concurrency is set higher than `1`. This approach encourages + developers to avoid relying on global state across requests, promoting + best practices in serverless application design.""" + return sorted( + processes[handler_name], + key=lambda process: process.last_used_timestamp, + )[0] + + +def first_warm( + handler_name: str, processes: dict[str, list[RunnerProcessProtocol]] +) -> RunnerProcessProtocol: + """This strategy prioritizes the use of the first available Lambda Process + in a "warm" state to handle incoming requests. If no warm instances are + available, it initiates a "cold start". This behavior more closely mimics + the operational dynamics of AWS Lambda, where reusing warm instances can + lead to faster response times.""" + best_candidate = None + for process in processes[handler_name]: + if process.state == SmythHandlerState.WARM: + return process + if process.state == SmythHandlerState.COLD and not best_candidate: + best_candidate = process + + if best_candidate is None: + raise Exception("No process available") + + return best_candidate diff --git a/src/smyth/schema.py b/src/smyth/schema.py deleted file mode 100644 index ec3c6c3..0000000 --- a/src/smyth/schema.py +++ /dev/null @@ -1,30 +0,0 @@ -from enum import StrEnum - -from pydantic import BaseModel, ConfigDict, Field - - -class RunnerResultType(StrEnum): - HTTP = "HTTP" - EXCEPTION = "EXCEPTION" - - -class LambdaHttpResponse(BaseModel): - model_config = ConfigDict( - populate_by_name=True, - ) - is_base64_encoded: bool = Field(..., alias="isBase64Encoded") - status_code: int = Field(..., description="HTTP status code", alias="statusCode") - headers: dict[str, str] - body: str - - -class LambdaExceptionResponse(BaseModel): - message: str - type: str - stack_trace: str - - -class RunnerResult(BaseModel): - type: RunnerResultType - response: LambdaHttpResponse | LambdaExceptionResponse - diff --git a/src/smyth/server.py b/src/smyth/server.py deleted file mode 100644 index 8e7b612..0000000 --- a/src/smyth/server.py +++ /dev/null @@ -1,99 +0,0 @@ -import json -import logging -import os -from contextlib import asynccontextmanager -from multiprocessing import set_start_method - -from starlette import status -from starlette.applications import Starlette -from starlette.requests import Request -from starlette.responses import JSONResponse, Response -from starlette.routing import Route - -from smyth.config import deserialize_config -from smyth.process import LambdaProcess, ProcessDefinition, get_process_definitions -from smyth.schema import ( - LambdaExceptionResponse, - LambdaHttpResponse, -) - -LOGGER = logging.getLogger(__name__) -set_start_method("spawn", force=True) - -config = deserialize_config(os.environ["_SMYTH_CONFIG"]) - -PROCESSES = get_process_definitions(config=config) - - -@asynccontextmanager -async def lifespan(app: Starlette): - LOGGER.info("Starting lambda processes") - started_processes: list[LambdaProcess] = [] - for process_def in PROCESSES: - process_def.process.start() - started_processes.append(process_def.process) - LOGGER.info("[blue]%s[/blue] started and listening for events", process_def.process.name) - try: - yield - finally: - for process in started_processes: - process.terminate() - process.join() - LOGGER.info("All lambda processes terminated") - - - -def get_process_definition(path: str) -> ProcessDefinition | None: - for process_def in PROCESSES: - if process_def.url_path.match(path): - return process_def - return None - - -async def lambda_invoker_endpoint(request: Request): - process_def = get_process_definition(request.url.path) - - if not process_def: - return Response( - "No lambda handler for that path", status_code=status.HTTP_404_NOT_FOUND - ) - - LOGGER.info( - "Matched process %s for path %s", process_def.process.name, request.url.path - ) - - event_data = await process_def.event_data_generator(request) - context_data = await process_def.context_data_generator( - request, process_def.timeout - ) - - result = process_def.process.send( - json.dumps({"event": event_data, "context": context_data}) - ) - if not result: - return Response("No response", status_code=status.HTTP_408_REQUEST_TIMEOUT) - - if isinstance(result.response, LambdaExceptionResponse): - return JSONResponse( - content=result.response.model_dump(), - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) - elif isinstance(result.response, LambdaHttpResponse): - return Response( - content=result.response.body, - status_code=result.response.status_code, - headers=result.response.headers, - ) - - -app = Starlette( - debug=True, - lifespan=lifespan, - routes=[ - Route( - "/{path:path}", - lambda_invoker_endpoint, - methods=["GET", "POST", "PUT", "DELETE"], - ), - ], -) diff --git a/src/smyth/server/__init__.py b/src/smyth/server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/smyth/server/app.py b/src/smyth/server/app.py new file mode 100644 index 0000000..4389b48 --- /dev/null +++ b/src/smyth/server/app.py @@ -0,0 +1,74 @@ +import logging +from contextlib import asynccontextmanager +from multiprocessing import set_start_method + +from starlette.applications import Starlette + +from smyth.config import get_config, get_config_dict +from smyth.server.endpoints import ( + invocation_endpoint, + lambda_invoker_endpoint, + status_endpoint, +) +from smyth.smyth import Smyth +from smyth.utils import import_attribute + +LOGGER = logging.getLogger(__name__) +set_start_method("spawn", force=True) + + +@asynccontextmanager +async def lifespan(app: "SmythStarlette"): + app.smyth.start_runners() + yield + app.smyth.stop_runners() + + +class SmythStarlette(Starlette): + smyth: Smyth + + def __init__(self, smyth: Smyth, smyth_path_prefix: str, *args, **kwargs): + self.smyth = smyth + kwargs["lifespan"] = lifespan + super().__init__(*args, **kwargs) + self.add_route( + f"{smyth_path_prefix}/api/status", status_endpoint, methods=["GET"] + ) + self.add_route( + "/2015-03-31/functions/{function:str}/invocations", + invocation_endpoint, + methods=["POST"], + ) + self.add_route( + "/{path:path}", + lambda_invoker_endpoint, + methods=["GET", "POST", "PUT", "DELETE"], + ) + + +def create_app(): + config = get_config(get_config_dict()) + + smyth = Smyth() + + for handler_name, handler_config in config.handlers.items(): + smyth.add_handler( + name=handler_name, + path=handler_config.url_path, + lambda_handler=import_attribute(handler_config.handler_path), + timeout=handler_config.timeout, + event_data_generator=import_attribute( + handler_config.event_data_generator_path + ), + context_data_generator=import_attribute( + handler_config.context_data_generator_path + ), + fake_coldstart=handler_config.fake_coldstart, + log_level=handler_config.log_level, + concurrency=handler_config.concurrency, + strategy_function=import_attribute(handler_config.strategy_function_path), + ) + + app = SmythStarlette(smyth=smyth, smyth_path_prefix=config.smyth_path_prefix) + + return app diff --git a/src/smyth/server/endpoints.py b/src/smyth/server/endpoints.py new file mode 100644 index 0000000..11fc7cc --- /dev/null +++ b/src/smyth/server/endpoints.py @@ -0,0 +1,88 @@ +import logging +from typing import Any + +from starlette import status +from starlette.requests import Request +from starlette.responses import JSONResponse, Response + +from smyth.event import generate_lambda_invokation_event_data +from smyth.exceptions import LambdaInvocationError, LambdaTimeoutError +from smyth.smyth import Smyth +from smyth.types import EventDataGenerator, SmythHandler + +LOGGER = logging.getLogger(__name__) + + +async def dispatch( + smyth: Smyth, + handler: SmythHandler, + request: Request, + event_data_generator: EventDataGenerator | None = None, +): + try: + result = await smyth.dispatch( + handler, request, event_data_generator=event_data_generator + ) + except LambdaInvocationError as error: + return Response(str(error), status_code=status.HTTP_502_BAD_GATEWAY) + except LambdaTimeoutError: + return Response("Lambda timeout", status_code=status.HTTP_408_REQUEST_TIMEOUT) + + if not result: + return Response( + "No response", status_code=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + + return Response( + content=result.get("body", result), + status_code=result.get("statusCode", 200), + headers=result.get("headers", {}), + ) + + +async def lambda_invoker_endpoint(request: Request): + smyth: Smyth = request.app.smyth + handler = smyth.get_handler_for_request(request.url.path) + return await dispatch(smyth, handler, request) + + +async def invocation_endpoint(request: Request): + smyth: Smyth = request.app.smyth + function = request.path_params["function"] + try: + handler = smyth.get_handler_for_name(function) + except KeyError: + return Response( + f"Function {function} not found", status_code=status.HTTP_404_NOT_FOUND + ) + handler.event_data_generator = generate_lambda_invokation_event_data + return await dispatch( + smyth, + handler, + request, + event_data_generator=generate_lambda_invokation_event_data, + ) + + +async def status_endpoint(request: Request): + smyth: Smyth = request.app.smyth + + response_data: dict[str, Any] = { + "lambda handlers": {}, + } + + for process_group_name, process_group in smyth.processes.items(): + response_data["lambda handlers"][process_group_name] = { # type: ignore[index] + "processes": [], + } + for process in process_group: + response_data["lambda handlers"][process_group_name]["processes"].append( # type: ignore[index] + { + "state": process.state, + "task_counter": process.task_counter, + } + ) + return JSONResponse( + content=response_data, + status_code=status.HTTP_200_OK, + ) diff --git a/src/smyth/smyth.py b/src/smyth/smyth.py new file mode 100644 index 0000000..33ccda7 --- /dev/null +++ b/src/smyth/smyth.py @@ -0,0 +1,117 @@ +import logging + +from starlette.requests import Request +from starlette.routing import compile_path + +from smyth.context import generate_context_data +from smyth.event import generate_api_gw_v2_event_data +from smyth.exceptions import ProcessDefinitionNotFoundError +from smyth.runner.process import RunnerProcess +from smyth.runner.strategy import first_warm +from smyth.types import ( + ContextDataGenerator, + EventDataGenerator, + LambdaHandler, + RunnerProcessProtocol, + SmythHandler, + StrategyFunction, +) + +LOGGER = logging.getLogger(__name__) + + +class Smyth: + handlers: dict[str, SmythHandler] + processes: dict[str, list[RunnerProcessProtocol]] + + def __init__(self) -> None: + self.handlers = {} + self.processes = {} + + def add_handler( + self, + name: str, + path: str, + lambda_handler: LambdaHandler, + timeout: float | None = None, + event_data_generator: EventDataGenerator = generate_api_gw_v2_event_data, + context_data_generator: ContextDataGenerator = generate_context_data, + fake_coldstart: bool = False, + log_level: str = "INFO", + concurrency: int = 1, + strategy_function: StrategyFunction = first_warm, + ): + self.handlers[name] = SmythHandler( + name=name, + url_path=compile_path(path)[0], + lambda_handler=lambda_handler, + event_data_generator=event_data_generator, + context_data_generator=context_data_generator, + timeout=timeout, + fake_coldstart=fake_coldstart, + log_level=log_level, + concurrency=concurrency, + strategy_function=strategy_function, + ) + + def start_runners(self): + for handler_name, handler_config in self.handlers.items(): + self.processes[handler_name] = [] + for index in range(handler_config.concurrency): + process = RunnerProcess( + name=f"{handler_name}:{index}", + lambda_handler=handler_config.lambda_handler, + log_level=handler_config.log_level, + ) + process.start() + LOGGER.info("Started process %s", process.name) + self.processes[handler_name].append(process) + + def stop_runners(self): + for process_group in self.processes.values(): + for process in process_group: + LOGGER.info("Stopping process %s", process.name) + process.stop() + for process_group in self.processes.values(): + for process in process_group: + LOGGER.debug("Joining process %s", process.name) + if process.is_alive(): + process.terminate() + process.join() + + def get_handler_for_request(self, path: str) -> SmythHandler: + for handler_def in self.handlers.values(): + if handler_def.url_path.match(path): + return handler_def + raise ProcessDefinitionNotFoundError( + f"No process definition found for path {path}" + ) + + def get_handler_for_name(self, name: str) -> SmythHandler: + return self.handlers[name] + + async def dispatch( + self, + handler: SmythHandler, + request: Request, + event_data_generator: EventDataGenerator | None = None, + ): + process = handler.strategy_function(handler.name, self.processes) + if process is None: + raise ProcessDefinitionNotFoundError( + f"No process definition found for handler {handler.name}" + ) + + if event_data_generator is None: + event_data_generator = handler.event_data_generator + + event_data = await event_data_generator(request) + context_data = await handler.context_data_generator(request, handler, process) + + return await process.asend( + { + "type": "smyth.lambda.invoke", + "event": event_data, + "context": context_data, + } + ) diff --git a/src/smyth/types.py b/src/smyth/types.py new file mode 100644 index 0000000..9559617 --- /dev/null +++ b/src/smyth/types.py @@ -0,0 +1,50 @@ +from collections.abc import Awaitable, Callable, MutableMapping +from dataclasses import dataclass +from enum import Enum +from re import Pattern +from typing import Any, Protocol + +from aws_lambda_powertools.utilities.typing import LambdaContext +from starlette.requests import Request + +LambdaEvent = MutableMapping[str, Any] +LambdaHandler = Callable[[LambdaEvent, LambdaContext], dict[str, Any]] +RunnerMessage = MutableMapping[str, Any] +EventDataGenerator = Callable[[Request], Awaitable[dict[str, Any]]] +ContextDataGenerator = Callable[ + [Request, "SmythHandler", "RunnerProcessProtocol"], Awaitable[dict[str, Any]] +] +StrategyFunction = Callable[ + [str, dict[str, list["RunnerProcessProtocol"]]], "RunnerProcessProtocol" +] + + +class SmythHandlerState(str, Enum): + COLD = "cold" + WORKING = "working" + WARM = "warm" + + +class RunnerProcessProtocol(Protocol): + name: str + task_counter: int + last_used_timestamp: float + state: SmythHandlerState + + async def asend(self, data) -> RunnerMessage | None: ... + + +@dataclass +class SmythHandler: + name: str + url_path: Pattern[str] + lambda_handler: Callable[[LambdaEvent, LambdaContext], dict[str, Any]] + event_data_generator: Callable[[Request], Awaitable[dict[str, Any]]] + context_data_generator: Callable[ + [Request, "SmythHandler", RunnerProcessProtocol], Awaitable[dict[str, Any]] + ] + strategy_function: StrategyFunction + timeout: float | None = None + fake_coldstart: bool = False + log_level: str = "INFO" + concurrency: int = 1 diff --git a/src/smyth/utils.py b/src/smyth/utils.py index 691950d..51af62c 100644 --- a/src/smyth/utils.py +++ b/src/smyth/utils.py @@ -1,9 +1,8 @@ -from importlib import import_module import logging +from importlib import import_module class SmythStatusRouteFilter(logging.Filter): - def __init__(self, name: str = "", smyth_path_prefix: str = "") -> None: super().__init__(name) self.smyth_path_prefix = smyth_path_prefix diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..58c38df --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,55 @@ +import re +from unittest.mock import Mock + +import pytest + +from smyth.types import RunnerProcessProtocol, SmythHandler, SmythHandlerState + + +@pytest.fixture +def smyth_handler( + mock_lambda_handler, + mock_event_data_generator, + mock_context_data_generator, + mock_strategy_function, +): + return SmythHandler( + name="test_handler", + url_path=re.compile(r"/test_handler"), + lambda_handler=mock_lambda_handler, + event_data_generator=mock_event_data_generator, + context_data_generator=mock_context_data_generator, + strategy_function=mock_strategy_function, + ) + + +@pytest.fixture +def mock_lambda_handler(): + return Mock() + + +@pytest.fixture +def mock_event_data_generator(): + return Mock() + + +@pytest.fixture +def mock_context_data_generator(): + return Mock() + + +@pytest.fixture +def mock_strategy_function(): + return Mock() + + +@pytest.fixture +def mock_runner_process(): + mock = Mock( + spec=RunnerProcessProtocol, + ) + mock.name = "test_process" + mock.task_counter = 0 + mock.last_used_timestamp = 0 + mock.state = SmythHandlerState.COLD + return mock diff --git a/tests/test_context.py b/tests/test_context.py new file mode 100644 index 0000000..ea29cae --- /dev/null +++ b/tests/test_context.py @@ -0,0 +1,40 @@ +import re +from unittest.mock import ANY + +from smyth.context import generate_context_data +from smyth.types import SmythHandlerState + + +async def test_generate_context_data( + smyth_handler, + mock_runner_process, + mock_lambda_handler, + mock_event_data_generator, + mock_context_data_generator, + mock_strategy_function, +): + assert await generate_context_data(None, smyth_handler, mock_runner_process) == { + "smyth": { + "handler": { + "handler_config": { + "concurrency": 1, + "context_data_generator": ANY, + "event_data_generator": ANY, + "fake_coldstart": False, + "lambda_handler": ANY, + "log_level": "INFO", + "name": "test_handler", + "strategy_function": ANY, + "timeout": None, + "url_path": re.compile("/test_handler"), + }, + "name": "test_handler", + }, + "process": { + "last_used_timestamp": 0, + "name": "test_process", + "state": SmythHandlerState.COLD, + "task_counter": 0, + }, + }, + }