Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: 🎨 Enh/payment adjustments #5125

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
PAG: Final[str] = "Payments Gateway service"
PGDB: Final[str] = "Postgres service"
RUT: Final[str] = "Resource Usage Tracker service"


MSG_GATEWAY_UNAVAILABLE_ERROR = "Our payments provider is temporary innoperative"
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ def get_full_class_name(cls) -> str:
#


class PaymentsGatewayError(_BaseAppError):
class BasePaymentsGatewayError(_BaseAppError):
...


class PaymentsGatewayNotReadyError(PaymentsGatewayError):
class PaymentsGatewayNotReadyError(BasePaymentsGatewayError):
msg_template = "Payments-Gateway is unresponsive: {checks}"
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ class ApplicationSettings(_BaseApplicationSettings):
"""

PAYMENTS_GATEWAY_URL: HttpUrl = Field(
..., description="Base url to the payment gateway"
...,
description="Base url to the payment gateway."
"Used for both internal communication and "
"to get an external link to the gateway for the payment form",
)

PAYMENTS_GATEWAY_API_SECRET: SecretStr = Field(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sqlalchemy as sa
from models_library.users import GroupID, UserID
from pydantic import EmailStr
from simcore_postgres_database.models.users import users

from .base import BaseRepository
Expand All @@ -20,3 +21,14 @@ async def get_primary_group_id(self, user_id: UserID) -> GroupID:
msg = f"{user_id=} not found"
raise ValueError(msg)
return GroupID(row.primary_gid)

async def get_name_and_email(self, user_id: UserID) -> tuple[str, EmailStr]:
async with self.db_engine.begin() as conn:
result = await conn.execute(
sa.select(users.c.name, users.c.email).where(users.c.id == user_id)
)
row = result.first()
if row is None:
msg = f"{user_id=} not found"
raise ValueError(msg)
return row.name, EmailStr(row.email)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

def merge_models(got: GetPaymentMethod, acked: PaymentsMethodsDB) -> PaymentMethodGet:
assert acked.completed_at # nosec
assert got.id == acked.payment_method_id # nosec

return PaymentMethodGet(
idr=acked.payment_method_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from contextlib import suppress
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import cast
Expand All @@ -16,6 +17,7 @@
from models_library.wallets import WalletID
from pydantic import EmailStr, parse_obj_as, parse_raw_as
from simcore_service_payments.db.auto_recharge_repo import AutoRechargeRepo
from simcore_service_payments.db.payment_users_repo import PaymentsUsersRepo
from simcore_service_payments.db.payments_methods_repo import PaymentsMethodsRepo
from simcore_service_payments.db.payments_transactions_repo import (
PaymentsTransactionsRepo,
Expand Down Expand Up @@ -149,6 +151,15 @@ async def _perform_auto_recharge(
payments_gateway = PaymentsGatewayApi.get_from_app_state(app)
payments_transactions_repo = PaymentsTransactionsRepo(db_engine=app.state.engine)
rut_api = ResourceUsageTrackerApi.get_from_app_state(app)
users_repo = PaymentsUsersRepo(db_engine=app.state.engine)

# NOTE: this will probably be removed https://github.com/ITISFoundation/appmotion-exchange/issues/21
user_name = f"id={payment_method_db.user_id}"
user_email = EmailStr(f"placeholder_{payment_method_db.user_id}@example.itis")
with suppress(ValueError):
user_name, user_email = await users_repo.get_name_and_email(
payment_method_db.user_id
)

await pay_with_payment_method(
gateway=payments_gateway,
Expand All @@ -162,7 +173,7 @@ async def _perform_auto_recharge(
wallet_id=rabbit_message.wallet_id,
wallet_name=f"id={rabbit_message.wallet_id}",
user_id=payment_method_db.user_id,
user_name=f"id={payment_method_db.user_id}",
user_email=EmailStr(f"placeholder_{payment_method_db.user_id}@example.itis"),
user_name=user_name,
user_email=user_email,
comment="Payment generated by auto recharge",
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging

from fastapi import FastAPI
from models_library.healthchecks import LivenessResult
from sqlalchemy.ext.asyncio import AsyncEngine

Expand All @@ -25,3 +26,32 @@ async def create_health_report(
"resource_usage_tracker": rut_liveness,
"postgres": db_liveness,
}


async def _monitor_liveness():
#
#
# logs with specific format so graylog can send alarm if found
#
#
raise NotImplementedError


async def _periodic():
while True:
# do something
await _monitor_liveness()
# what if fails?, wait&repeat or stop-forever or cleanup&restart ?


def setup_healthchecks(app: FastAPI):
# setup _monitor_liveness as a periodic task in only one of the replicas

async def _on_startup() -> None:
...

async def _on_shutdown() -> None:
...

app.add_event_handler("startup", _on_startup)
app.add_event_handler("shutdown", _on_shutdown)
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
import logging
from collections.abc import Callable
from contextlib import suppress
from typing import Coroutine

import httpx
from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from httpx import URL, HTTPStatusError
from models_library.api_schemas_payments.errors import PaymentServiceUnavailableError
from models_library.api_schemas_webserver.wallets import PaymentID, PaymentMethodID
from pydantic import ValidationError, parse_raw_as
from pydantic.errors import PydanticErrorMixin
from servicelib.fastapi.app_state import SingletonInAppStateMixin
from servicelib.fastapi.http_client import (
AttachLifespanMixin,
Expand All @@ -28,7 +29,12 @@
from simcore_service_payments.models.schemas.acknowledgements import (
AckPaymentWithPaymentMethod,
)
from tenacity import AsyncRetrying, stop_after_delay, wait_exponential
from tenacity.retry import retry_if_exception_type
from tenacity.wait import wait_exponential

from .._constants import MSG_GATEWAY_UNAVAILABLE_ERROR, PAG
from ..core.errors import BasePaymentsGatewayError, PaymentsGatewayNotReadyError
from ..core.settings import ApplicationSettings
from ..models.payments_gateway import (
BatchGetPaymentMethods,
Expand All @@ -52,13 +58,13 @@ def _parse_raw_as_or_none(cls: type, text: str | None):
return None


class PaymentsGatewayError(PydanticErrorMixin, ValueError):
class PaymentsGatewayApiError(BasePaymentsGatewayError):
msg_template = "{operation_id} error {status_code}: {reason}"

@classmethod
def from_http_status_error(
cls, err: HTTPStatusError, operation_id: str
) -> "PaymentsGatewayError":
) -> "PaymentsGatewayApiError":
return cls(
operation_id=f"PaymentsGatewayApi.{operation_id}",
reason=f"{err}",
Expand All @@ -81,22 +87,37 @@ def get_detailed_message(self) -> str:


@contextlib.contextmanager
def _raise_as_payments_gateway_error(operation_id: str):
def _reraise_as_service_errors_context(operation_id: str):
try:
yield

except HTTPStatusError as err:
error = PaymentsGatewayError.from_http_status_error(
except httpx.RequestError as err:
_logger.exception("%s: request error", PAG)
raise PaymentServiceUnavailableError(
human_readable_detail=MSG_GATEWAY_UNAVAILABLE_ERROR
) from err

except httpx.HTTPStatusError as err:
error = PaymentsGatewayApiError.from_http_status_error(
err, operation_id=operation_id
)
_logger.warning(error.get_detailed_message())
raise error from err

if err.response.is_client_error:
_logger.warning(error.get_detailed_message())
raise error from err

if err.response.is_server_error:
# 5XX in server -> turn into unavailable
_logger.exception(error.get_detailed_message())
raise PaymentServiceUnavailableError(
human_readable_detail=MSG_GATEWAY_UNAVAILABLE_ERROR
) from err

def _handle_status_errors(coro: Callable):

def _handle_httpx_errors(coro: Callable):
@functools.wraps(coro)
async def _wrapper(self, *args, **kwargs):
with _raise_as_payments_gateway_error(operation_id=coro.__name__):
with _reraise_as_service_errors_context(operation_id=coro.__name__):
return await coro(self, *args, **kwargs)

return _wrapper
Expand All @@ -120,7 +141,7 @@ class PaymentsGatewayApi(
# api: one-time-payment workflow
#

@_handle_status_errors
@_handle_httpx_errors
async def init_payment(self, payment: InitPayment) -> PaymentInitiated:
response = await self.client.post(
"/init",
Expand All @@ -132,7 +153,7 @@ async def init_payment(self, payment: InitPayment) -> PaymentInitiated:
def get_form_payment_url(self, id_: PaymentID) -> URL:
return self.client.base_url.copy_with(path="/pay", params={"id": f"{id_}"})

@_handle_status_errors
@_handle_httpx_errors
async def cancel_payment(
self, payment_initiated: PaymentInitiated
) -> PaymentCancelled:
Expand All @@ -147,7 +168,7 @@ async def cancel_payment(
# api: payment method workflows
#

@_handle_status_errors
@_handle_httpx_errors
async def init_payment_method(
self,
payment_method: InitPaymentMethod,
Expand All @@ -166,7 +187,7 @@ def get_form_payment_method_url(self, id_: PaymentMethodID) -> URL:

# CRUD

@_handle_status_errors
@_handle_httpx_errors
async def get_many_payment_methods(
self, ids_: list[PaymentMethodID]
) -> list[GetPaymentMethod]:
Expand All @@ -179,18 +200,18 @@ async def get_many_payment_methods(
response.raise_for_status()
return PaymentMethodsBatch.parse_obj(response.json()).items

@_handle_status_errors
@_handle_httpx_errors
async def get_payment_method(self, id_: PaymentMethodID) -> GetPaymentMethod:
response = await self.client.get(f"/payment-methods/{id_}")
response.raise_for_status()
return GetPaymentMethod.parse_obj(response.json())

@_handle_status_errors
@_handle_httpx_errors
async def delete_payment_method(self, id_: PaymentMethodID) -> None:
response = await self.client.delete(f"/payment-methods/{id_}")
response.raise_for_status()

@_handle_status_errors
@_handle_httpx_errors
async def pay_with_payment_method(
self, id_: PaymentMethodID, payment: InitPayment
) -> AckPaymentWithPaymentMethod:
Expand All @@ -202,6 +223,27 @@ async def pay_with_payment_method(
return AckPaymentWithPaymentMethod.parse_obj(response.json())


def _create_start_policy(api: PaymentsGatewayApi) -> Callable[[], Coroutine]:
# Start policy:
# - this service will not be able to start if payments-gateway is alive
#
async def _():
results = []
async for attempt in AsyncRetrying(
wait=wait_exponential(max=3),
stop=stop_after_delay(max_delay=6),
retry=retry_if_exception_type(PaymentsGatewayNotReadyError),
reraise=True,
):
with attempt:
alive = await api.check_liveness()
results.append(alive)
if not alive:
raise PaymentsGatewayNotReadyError(checks=results)

return _


def setup_payments_gateway(app: FastAPI):
assert app.state # nosec
settings: ApplicationSettings = app.state.settings
Expand All @@ -216,3 +258,5 @@ def setup_payments_gateway(app: FastAPI):
)
api.attach_lifespan_to(app)
api.set_to_app_state(app)

app.add_event_handler("startup", _create_start_policy(api))
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,12 @@ async def list_payment_methods(
[acked.payment_method_id for acked in acked_many]
)

# FIXME: if out-of-sync w/ gateway, then this code will raise! because it has strict=True!
# FIXME: is order correct? is one-to-one ? is order preserved?

return [
merge_models(got, acked)
for acked, got in zip(acked_many, got_many, strict=True)
for got, acked in zip(got_many, acked_many, strict=True)
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ def app_environment(
)
async def test_successful_one_time_payment_workflow(
is_pdb_enabled: bool,
mock_payments_gateway_service_or_none: MockRouter | None,
app: FastAPI,
client: httpx.AsyncClient,
faker: Faker,
rpc_client: RabbitMQRPCClient,
mock_payments_gateway_service_or_none: MockRouter | None,
wallet_id: WalletID,
wallet_name: IDStr,
user_id: UserID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ def app_environment(
)
async def test_successful_create_payment_method_workflow(
is_pdb_enabled: bool,
mock_payments_gateway_service_or_none: MockRouter | None,
app: FastAPI,
client: httpx.AsyncClient,
faker: Faker,
rpc_client: RabbitMQRPCClient,
mock_payments_gateway_service_or_none: MockRouter | None,
wallet_id: WalletID,
wallet_name: IDStr,
user_id: UserID,
Expand Down
Loading
Loading