diff --git a/.github/actions/sse-deploy-ecs/action.yml b/.github/actions/sse-deploy-ecs/action.yml new file mode 100644 index 0000000..258a855 --- /dev/null +++ b/.github/actions/sse-deploy-ecs/action.yml @@ -0,0 +1,75 @@ +name: Deploy SSE to ECS +description: "Deploy the SSE service to ECS" + +inputs: + aws_access_key_id: + description: "The AWS access key ID to use for deploying to ECS." + required: true + aws_secret_access_key: + description: "The AWS secret access key to use for deploying to ECS." + required: true + aws_ecs_cluster_name: + description: "The name of the ECS cluster to deploy to." + required: true + aws_ecs_service_name: + description: "The name of the ECS service to deploy to." + required: true + aws_ecr_repository_arn: + description: "The ARN of the ECR repository to deploy docker image to." + required: true + aws_task_definitions_directory_path: + description: "The local path in the repository to the json file containing the task definition template" + required: true + +runs: + using: composite + + steps: + - name: Set ECR tag + id: ecr-tag-variable + run: echo ::set-output name=tag::${GITHUB_REF#refs/*/} + shell: bash + + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ inputs.aws_access_key_id }} + aws-secret-access-key: ${{ inputs.aws_secret_access_key }} + aws-region: eu-west-2 + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + + - name: Build, tag, and push image to Amazon ECR + id: build-image + env: + IMAGE_TAG: ${{ steps.ecr-tag-variable.outputs.tag }} + ECR_REPOSITORY: ${{ inputs.aws_ecr_repository_arn }} + DOCKER_BUILDKIT: "1" + run: | + echo "Building docker image with URL: " + echo $ECR_REPOSITORY:$IMAGE_TAG + docker build -t $ECR_REPOSITORY:$IMAGE_TAG -f Dockerfile . + docker push $ECR_REPOSITORY:$IMAGE_TAG + echo "::set-output name=image::$ECR_REPOSITORY:$IMAGE_TAG" + shell: bash + + - name: Fill in the new image ID in the Amazon SSE task definition + id: task-def-api + uses: aws-actions/amazon-ecs-render-task-definition@v1 + with: + task-definition: ${{ inputs.aws_task_definitions_directory_path }}/ecs-task-definition-sse.json + container-name: flagsmith-sse + image: ${{ steps.build-image.outputs.image }} + + - name: Deploy Amazon ECS SSE task definition + uses: aws-actions/amazon-ecs-deploy-task-definition@v1 + with: + cluster: ${{ inputs.aws_ecs_cluster_name }} + service: ${{ inputs.aws_ecs_service_name }} + task-definition: ${{ steps.task-def-api.outputs.task-definition }} + + - name: Wait for service to be stable + run: aws ecs wait services-stable --cluster ${{ inputs.aws_ecs_cluster_name }} --services ${{ inputs.aws_ecs_service_name }} + shell: bash diff --git a/.github/workflows/sse-deploy-staging-ecs.yml b/.github/workflows/sse-deploy-staging-ecs.yml new file mode 100644 index 0000000..ce37cc0 --- /dev/null +++ b/.github/workflows/sse-deploy-staging-ecs.yml @@ -0,0 +1,33 @@ +name: Deploy SSE to Staging ECS + +on: + push: + branches: + - main + paths: + - "api/**" + - ".github/**" + - 'infrastructure/aws/staging/**' + +jobs: + deploy-staging-ecs: + runs-on: ubuntu-latest + name: Deploy SSE to Staging ECS + environment: staging + + steps: + - name: Cloning repo + uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Deploy SSE to Staging + uses: ./.github/actions/sse-deploy-ecs + with: + aws_access_key_id: AKIAUM26IRCPH7BHBLVE + aws_secret_access_key: ${{ secrets.STAGING_AWS_SECRET_ACCESS_KEY }} + aws_ecs_cluster_name: flagsmith-api-cluster-eu-west-2-f241261 + aws_ecs_service_name: flagsmith-sse-svc-eu-west-2-43f7af2 + aws_ecr_repository_arn: 302456015006.dkr.ecr.eu-west-2.amazonaws.com/flagsmith-sse-ecr-fa16ac8 + aws_task_definitions_directory_path: infrastructure/aws/staging + diff --git a/infrastructure/aws/staging/ecs-task-definition-sse.json b/infrastructure/aws/staging/ecs-task-definition-sse.json new file mode 100644 index 0000000..26ffdb2 --- /dev/null +++ b/infrastructure/aws/staging/ecs-task-definition-sse.json @@ -0,0 +1,44 @@ +{ + "family": "flagsmith-sse", + "networkMode": "awsvpc", + "executionRoleArn": "arn:aws:iam::302456015006:role/task-sse-role-8474d2a", + "taskRoleArn": "arn:aws:iam::302456015006:role/task-sse-role-8474d2a", + "containerDefinitions": [ + { + "name": "flagsmith-sse", + "command": [ + "uvicorn", + "src.main:app", + "--host", + "0.0.0.0", + "--port", + "8000", + "--no-access-log" + ], + "cpu": 0, + "portMappings": [ + { + "containerPort": 8000, + "hostPort": 8000, + "protocol": "tcp" + } + ], + "essential": true, + "environment": [], + "secrets": [], + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": "flagsmith-sse-eu-west-2-4a10ba2", + "awslogs-region": "eu-west-2", + "awslogs-stream-prefix": "awslogs-flagsmith-sse" + } + } + } + ], + "requiresCompatibilities": [ + "FARGATE" + ], + "cpu": "1024", + "memory": "2048" +} diff --git a/requirements-dev.in b/requirements-dev.in index af9e380..8a15ee2 100644 --- a/requirements-dev.in +++ b/requirements-dev.in @@ -7,4 +7,12 @@ flake8 pytest pytest-asyncio pytest-mock -reorder-python-imports \ No newline at end of file +reorder-python-imports +httpx +# lock the version because `starlette`(from requirements.in) explicitly depends on it +# but httpx tries to fetch the latest version causing conflict between +# requirements.txt and requirements-dev.txt +anyio==3.4.0 + +# conflict between `requests` and httpx +certifi==2021.10.8 \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt index f3de0e7..bd688b8 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -4,6 +4,10 @@ # # pip-compile requirements-dev.in # +anyio==3.4.0 + # via + # -r requirements-dev.in + # httpcore aspy-refactor-imports==3.0.1 # via reorder-python-imports astroid==2.9.0 @@ -16,6 +20,11 @@ backports-entry-points-selectable==1.1.1 # via virtualenv black==22.6.0 # via -r requirements-dev.in +certifi==2021.10.8 + # via + # -r requirements-dev.in + # httpcore + # httpx cfgv==3.3.1 # via pre-commit click==8.0.3 @@ -26,8 +35,18 @@ filelock==3.4.0 # via virtualenv flake8==4.0.1 # via -r requirements-dev.in +h11==0.12.0 + # via httpcore +httpcore==0.15.0 + # via httpx +httpx==0.23.0 + # via -r requirements-dev.in identify==2.4.0 # via pre-commit +idna==3.3 + # via + # anyio + # rfc3986 iniconfig==1.1.1 # via pytest isort==5.10.1 @@ -74,7 +93,7 @@ pytest==6.2.5 # -r requirements-dev.in # pytest-asyncio # pytest-mock -pytest-asyncio==0.16.0 +pytest-asyncio==0.19.0 # via -r requirements-dev.in pytest-mock==3.6.1 # via -r requirements-dev.in @@ -82,8 +101,15 @@ pyyaml==6.0 # via pre-commit reorder-python-imports==3.0.1 # via -r requirements-dev.in +rfc3986[idna2008]==1.5.0 + # via httpx six==1.16.0 # via virtualenv +sniffio==1.2.0 + # via + # anyio + # httpcore + # httpx toml==0.10.2 # via # autopep8 diff --git a/requirements.in b/requirements.in index da27a4a..275e42c 100644 --- a/requirements.in +++ b/requirements.in @@ -6,3 +6,8 @@ flagsmith-flag-engine python-decouple python-dotenv pydantic +# sse-stuff +sse-starlette +asyncio +SQLAlchemy +aiosqlite diff --git a/requirements.txt b/requirements.txt index f0c5232..d85fc96 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,20 +2,26 @@ # This file is autogenerated by pip-compile with python 3.10 # To update, run: # -# pip-compile +# pip-compile requirements.in # +aiosqlite==0.17.0 + # via -r requirements.in anyio==3.4.0 # via starlette +asyncio==3.4.3 + # via -r requirements.in certifi==2021.10.8 # via requests charset-normalizer==2.0.9 # via requests click==8.0.3 # via uvicorn -fastapi==0.70.0 +fastapi==0.79.0 # via -r requirements.in flagsmith-flag-engine==1.5.0 # via -r requirements.in +greenlet==1.1.2 + # via sqlalchemy h11==0.12.0 # via uvicorn idna==3.3 @@ -34,14 +40,22 @@ python-decouple==3.5 # via -r requirements.in python-dotenv==0.19.2 # via -r requirements.in -requests==2.26.0 +requests==2.28.1 # via -r requirements.in sniffio==1.2.0 # via anyio -starlette==0.16.0 - # via fastapi +sqlalchemy==1.4.39 + # via -r requirements.in +sse-starlette==1.1.1 + # via -r requirements.in +starlette==0.19.1 + # via + # fastapi + # sse-starlette typing-extensions==4.0.1 - # via pydantic + # via + # aiosqlite + # pydantic urllib3==1.26.7 # via requests uvicorn==0.18.2 diff --git a/src/main.py b/src/main.py index 679c916..17da888 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,6 @@ from fastapi import FastAPI from fastapi import Header -from fastapi.responses import JSONResponse +from fastapi.middleware.cors import CORSMiddleware from flag_engine.engine import get_environment_feature_state from flag_engine.engine import get_environment_feature_states from flag_engine.engine import get_identity_feature_states @@ -12,17 +12,22 @@ from .schemas import APIFeatureStateSchema from .schemas import APITraitSchema from .settings import Settings +from .sse import router as sse_router from fastapi_utils.tasks import repeat_every app = FastAPI() settings = Settings() cache_service = CacheService(settings) - fs_schema = APIFeatureStateSchema() trait_schema = APITraitSchema() +@app.get("/health") +def health_check(): + return {"status": "ok"} + + @app.get("/api/v1/flags/") def flags(feature: str = None, x_environment_key: str = Header(None)): environment_document = cache_service.get_environment(x_environment_key) @@ -35,7 +40,7 @@ def flags(feature: str = None, x_environment_key: str = Header(None)): feature_states = get_environment_feature_states(environment) data = fs_schema.dump(feature_states, many=True) - return JSONResponse(content=data) + return data def _get_fs_schema(identity_model: IdentityModel): @@ -63,10 +68,20 @@ def identity( "traits": trait_schema.dump(trait_models, many=True), "flags": fs_schema.dump(flags, many=True), } - return JSONResponse(content=data) + return data @app.on_event("startup") @repeat_every(seconds=settings.api_poll_frequency, raise_exceptions=True) def refresh_cache(): cache_service.refresh() + + +app.add_middleware( + CORSMiddleware, + allow_origins=settings.allow_origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) +app.include_router(sse_router) diff --git a/src/settings.py b/src/settings.py index f4ec47a..e623667 100644 --- a/src/settings.py +++ b/src/settings.py @@ -31,6 +31,12 @@ class Settings(BaseSettings): api_url: HttpUrl = "https://edge.api.flagsmith.com/api/v1" api_poll_frequency: int = 10 + # sse settings + stream_delay: int = 1 # seconds + retry_timeout: int = 15000 # milliseconds + max_stream_age: int = 30 # seconds + allow_origins: List[str] = ["*"] + class Config: env_file = "config.json" env_file_encoding = "utf-8" diff --git a/src/sse.py b/src/sse.py new file mode 100644 index 0000000..f43615b --- /dev/null +++ b/src/sse.py @@ -0,0 +1,88 @@ +import asyncio +from datetime import datetime +from functools import lru_cache + +from fastapi import APIRouter +from fastapi import Depends +from fastapi import Request +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.sql import text +from sse_starlette.sse import EventSourceResponse + +from .settings import Settings +from .sse_models import Base +from .sse_models import Environment + +engine = create_async_engine("sqlite+aiosqlite:///:memory:", future=True) + +router = APIRouter() + + +@lru_cache() +def get_settings(): + return Settings() + + +@router.on_event("startup") +async def create_schema(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + +@router.on_event("shutdown") +async def drop_schema(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + + +@router.post("/sse/environments/{environment_key}/queue-change") +async def queue_environment_changes(environment_key: str): + async with AsyncSession(engine, autoflush=True) as session: + statement = text( + """INSERT OR REPLACE INTO environment(key) VALUES(:environment_key)""" + ) + await session.execute(statement, {"environment_key": environment_key}) + await session.commit() + return + + +@router.get("/sse/environments/{environment_key}/stream") +async def stream_environment_changes( + request: Request, environment_key: str, settings: Settings = Depends(get_settings) +): + async with AsyncSession(engine, autoflush=True) as session: + started_at = datetime.now() + + async def did_environment_change(): + environment_updated = False + environment = await session.get(Environment, environment_key) + if environment: + environment_updated = True + await session.delete(environment) + + await session.commit() + return environment_updated + + async def event_generator(): + while True: + # If client closes connection, or the stream is open for more than `MAX_AGE` seconds + # stop sending events + if ( + await request.is_disconnected() + or (datetime.now() - started_at).total_seconds() + > settings.max_stream_age + ): + await session.close() + break + if await did_environment_change(): + yield { + "event": "environment_updated", + "retry": settings.retry_timeout, + } + + await asyncio.sleep(settings.stream_delay) + + return EventSourceResponse( + event_generator(), headers={"Cache-Control": "public, max-age=29"} + ) diff --git a/src/sse_models.py b/src/sse_models.py new file mode 100644 index 0000000..76a1c7b --- /dev/null +++ b/src/sse_models.py @@ -0,0 +1,14 @@ +from sqlalchemy import Column +from sqlalchemy import String +from sqlalchemy.orm import declarative_base + + +Base = declarative_base() + + +class Environment(Base): + __tablename__ = "environment" + key = Column(String, primary_key=True) + + def __repr__(self): + return f"Environment(key={self.key!r})" diff --git a/tests/conftest.py b/tests/conftest.py index 28b11da..4f46e5b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,9 @@ import typing import pytest +from fastapi.testclient import TestClient + +from src.main import app @pytest.fixture @@ -36,3 +39,9 @@ def environment_1_feature_states_response_list_response_with_segment_override( "feature_state_value" ] = "segment_override" return environment_1_feature_states_response_list + + +@pytest.fixture +def client(): + with TestClient(app) as c: + yield c diff --git a/tests/test_main.py b/tests/test_main.py index aff3067..07302a3 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -8,6 +8,11 @@ client = TestClient(app) +def test_health_check_returns_200(): + response = client.get("/health") + assert response.status_code == 200 + + def test_get_flags(mocker, environment_1_feature_states_response_list): environment_key = "test_environment_key" mocked_cache_service = mocker.patch("src.main.cache_service") diff --git a/tests/test_sse.py b/tests/test_sse.py new file mode 100644 index 0000000..372f800 --- /dev/null +++ b/tests/test_sse.py @@ -0,0 +1,68 @@ +import asyncio + +import pytest +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession + +from src.main import app +from src.settings import Settings +from src.sse import engine +from src.sse import get_settings +from src.sse_models import Environment + + +def get_settings_override(): + return Settings(max_stream_age=5, stream_delay=1) + + +app.dependency_overrides[get_settings] = get_settings_override + + +@pytest.mark.asyncio +async def test_queue_environment_changes_creates_environment_in_db(client): + # Given + environment_key = "some_key" + + # When + response = client.post(f"/sse/environments/{environment_key}/queue-change") + + # Then + async with AsyncSession(engine) as session: + environment = await session.get(Environment, environment_key) + assert environment.key == environment_key + + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_stream_environment_changes(client): + # Given + environment_key = "some_key" + async with AsyncClient(app=app, base_url="http://test") as ac: + # First, let's create a task that makes a request to /stream endpoint + stream_response_task = asyncio.create_task( + ac.get(f"/sse/environments/{environment_key}/stream") + ) + # Now, let's yield control back to event loop so that it can run our task + await asyncio.sleep(0.1) + + # Next, let's update the environment + await ac.post(f"/sse/environments/{environment_key}/queue-change") + + # Now, let's wait for the change to be streamed + await asyncio.sleep(2) + + # Next, let's update the environment once again + await ac.post(f"/sse/environments/{environment_key}/queue-change") + + # Finally, let's wait for the stream to finish + response = await stream_response_task + + # Then + expected_response = ( + "event: environment_updated\r\ndata: None\r\nretry: 15000\r\n\r\nevent:" + " environment_updated\r\ndata: None\r\nretry: 15000\r\n\r\n" + ) + + assert response.status_code == 200 + assert response.text == expected_response