diff --git a/.dapr/components/pubsub.yaml b/.dapr/components/pubsub.yaml new file mode 100644 index 0000000..f7c308b --- /dev/null +++ b/.dapr/components/pubsub.yaml @@ -0,0 +1,8 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pubsub +spec: + type: pubsub.in-memory + version: v1 + metadata: [] \ No newline at end of file diff --git a/.dapr/dapr-config-minimal.yaml b/.dapr/dapr-config-minimal.yaml new file mode 100644 index 0000000..e097002 --- /dev/null +++ b/.dapr/dapr-config-minimal.yaml @@ -0,0 +1,5 @@ +apiVersion: dapr.io/v1alpha1 +kind: Configuration +metadata: + name: daprConfig + namespace: default \ No newline at end of file diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..6ce50f1 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,43 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the +// README at: https://github.com/devcontainers/templates/tree/main/src/docker-existing-docker-compose +{ + "name": "Existing Docker Compose (Extend)", + + // Update the 'dockerComposeFile' list if you have more compose files or use different names. + // The .devcontainer/docker-compose.yml file contains any overrides you need/want to make. + "dockerComposeFile": ["../compose.yaml", "docker-compose.yml"], + + // The 'service' property is the name of the service for the container that VS Code should + // use. Update this value and .devcontainer/docker-compose.yml to the real service name. + "service": "order", + + // The optional 'workspaceFolder' property is the path VS Code should open by default when + // connected. This is typically a file mount in .devcontainer/docker-compose.yml + "workspaceFolder": "/workspaces/${localWorkspaceFolderBasename}", + "customizations": { + "vscode": { + "extensions": ["rust-lang.rust-analyzer"] + } + } + + // Features to add to the dev container. More info: https://containers.dev/features. + // "features": {}, + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Uncomment the next line if you want start specific services in your Docker Compose config. + // "runServices": [], + + // Uncomment the next line if you want to keep your containers running after VS Code shuts down. + // "shutdownAction": "none", + + // Uncomment the next line to run commands after the container is created. + // "postCreateCommand": "cat /etc/os-release", + + // Configure tool-specific properties. + // "customizations": {}, + + // Uncomment to connect as an existing user other than the container default. More info: https://aka.ms/dev-containers-non-root. + // "remoteUser": "devcontainer" +} diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml new file mode 100644 index 0000000..0b507bf --- /dev/null +++ b/.devcontainer/docker-compose.yml @@ -0,0 +1,25 @@ +version: "3.8" +services: + # Update this to the name of the service you want to work with in your docker-compose.yml file + order: + # Uncomment if you want to override the service's Dockerfile to one in the .devcontainer + # folder. Note that the path of the Dockerfile and context is relative to the *primary* + # docker-compose.yml file (the first in the devcontainer.json "dockerComposeFile" + # array). The sample below assumes your primary file is in the root of your project. + # + # build: + # context: . + # dockerfile: .devcontainer/Dockerfile + + volumes: + # Update this to wherever you want VS Code to mount the folder of your project + - ..:/workspaces:cached + + # Uncomment the next four lines if you will use a ptrace-based debugger like C++, Go, and Rust. + # cap_add: + # - SYS_PTRACE + # security_opt: + # - seccomp:unconfined + + # Overrides default command so things don't shut down after the process ends. + command: /bin/sh -c "while sleep 1000; do :; done" diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1de5659 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +target \ No newline at end of file diff --git a/.github/workflows/publish-image.yaml b/.github/workflows/publish-image.yaml new file mode 100644 index 0000000..ad2ce2c --- /dev/null +++ b/.github/workflows/publish-image.yaml @@ -0,0 +1,54 @@ +name: Create and publish a Docker image on Release + +on: + push: + branches: + - "main" + tags: + - "v*" + pull_request: + branches: + - "main" + +concurrency: ${{ github.workflow }}-${{ github.ref }} + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + + build-and-push-image: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Log in to the Container registry + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + context: . + file: base-dockerfile + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file diff --git a/.github/workflows/test-update-schema.yaml b/.github/workflows/test-update-schema.yaml new file mode 100644 index 0000000..3f9f7c5 --- /dev/null +++ b/.github/workflows/test-update-schema.yaml @@ -0,0 +1,29 @@ +name: Test update GraphQL schema on pull request + +on: + pull_request: + +jobs: + schema: + name: Test update GraphQL schema on pull request + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + submodules: recursive + - name: Install latest stable + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + - uses: actions/checkout@v4 + with: + repository: "misarch/schemas" + path: "schemas" + - name: Save graphql schemas + run: | + cargo run -- --generate-schema + - uses: misarch/graphql-schema-transform@v1 + with: + schema: schemas/order.graphql + target: schemas/order.graphql \ No newline at end of file diff --git a/.github/workflows/update-infrastructure-docker.yaml b/.github/workflows/update-infrastructure-docker.yaml new file mode 100644 index 0000000..e13e3f8 --- /dev/null +++ b/.github/workflows/update-infrastructure-docker.yaml @@ -0,0 +1,31 @@ +name: Update infrastructure-docker submodule + +on: + push: + branches: + - main + +concurrency: ${{ github.workflow }}-${{ github.ref }} + +jobs: + schema: + name: Update infrastructure-docker submodule + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + repository: "misarch/infrastructure-docker" + submodules: true + - name: Update submodule + run: | + cd order + git checkout ${{ github.sha }} + - uses: peter-evans/create-pull-request@v5 + with: + commit-message: Update order schema + branch: update/order + token: ${{ secrets.INFRASTRUCTURE_DOCKER_PUSH_SECRET }} + - name: Set to auto merge + run: gh pr merge update/order --auto --merge -R misarch/infrastructure-docker + env: + GH_TOKEN: ${{ secrets.INFRASTRUCTURE_DOCKER_PUSH_SECRET }} diff --git a/.github/workflows/update-schema.yaml b/.github/workflows/update-schema.yaml new file mode 100644 index 0000000..acc6257 --- /dev/null +++ b/.github/workflows/update-schema.yaml @@ -0,0 +1,39 @@ +name: Update GraphQL schema + +on: + push: + branches: + - main + +concurrency: ${{ github.workflow }}-${{ github.ref }} + +jobs: + schema: + name: Update GraphQL schema + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + submodules: recursive + - name: Install latest stable + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + - uses: actions/checkout@v4 + with: + repository: "misarch/schemas" + path: "schemas" + - name: Save graphql schemas + run: | + cargo run -- --generate-schema + - uses: misarch/graphql-schema-transform@v1 + with: + schema: schemas/order.graphql + target: schemas/order.graphql + - uses: peter-evans/create-pull-request@v5 + with: + path: ./schemas + commit-message: Update order schema + branch: update/order + token: ${{ secrets.SCHEMAS_PUSH_SECRET }} diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..5fc6de5 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "schemas"] + path = schemas_repo + url = git@github.com:MiSArch/schemas.git diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..1280774 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "misarch-order" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-graphql = { version = "6.0.11", features = ["bson", "chrono", "uuid", "log"] } +async-graphql-axum = "6.0.11" +tokio = { version = "1.8", features = ["macros", "rt-multi-thread"] } +axum = { version = "0.6.0", features = ["headers", "macros"] } +mongodb = "2.8.0" +serde = "1.0.193" +futures = "0.3.30" +bson = { version = "2.8.1", features = ["chrono"]} +clap = { version = "4.4.13", features = ["derive"] } +uuid = { version = "1.6.1", features = ["v4", "serde"] } +mongodb-cursor-pagination = "0.3.2" +json = "0.12.4" +log = "0.4.20" +simple_logger = "4.3.3" +serde_json = "1.0.113" +graphql_client = "0.13.0" +reqwest = { version = "0.11.24", features = ["json"] } +chrono = { version = "0.4.33", features = ["serde"] } \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..defe6eb --- /dev/null +++ b/README.md @@ -0,0 +1,16 @@ +# Order service for MiSArch + +### Quickstart (DevContainer) + +1. Open VSCode Development Container +2. `cargo run` starts the GraphQL service + GraphiQL on port `8080` + +### Quickstart (Docker Compose) + +1. `docker compose -f docker-compose-dev.yaml up --build` in the repository root directory. **IMPORTANT:** MongoDB credentials should be configured for production. + +### What it can do + +- CRUD orders +- Validates all UUIDs input as strings +- Error prop to GraphQL diff --git a/base-dockerfile b/base-dockerfile new file mode 100644 index 0000000..e465e8f --- /dev/null +++ b/base-dockerfile @@ -0,0 +1,29 @@ +# Source: https://github.com/LukeMathWalker/cargo-chef + +FROM lukemathwalker/cargo-chef:latest-rust-1 AS chef +WORKDIR /misarch-order + +FROM chef AS planner +COPY . . +RUN cargo chef prepare --recipe-path recipe.json + +FROM chef AS builder +COPY --from=planner /misarch-order/recipe.json recipe.json + +RUN apt update && apt install -y pkg-config libssl-dev && rm -rf /var/lib/apt/lists/* + +# Build dependencies - this is the caching Docker layer! +RUN cargo chef cook --release --recipe-path recipe.json +# Build application +COPY . . + +RUN cargo build --release --bin misarch-order + +# We do not need the Rust toolchain to run the binary! +FROM debian:bookworm-slim AS runtime + +RUN apt update && apt install -y pkg-config libssl-dev && rm -rf /var/lib/apt/lists/* + +WORKDIR /misarch-order +COPY --from=builder /misarch-order/target/release/misarch-order /usr/local/bin +ENTRYPOINT ["/usr/local/bin/misarch-order"] \ No newline at end of file diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..7de2dc7 --- /dev/null +++ b/compose.yaml @@ -0,0 +1,36 @@ +services: + order: + extends: + file: docker-compose-base.yaml + service: order + build: + context: . + dockerfile: devcontainer-dockerfile + ports: + - 8080:8080 + order-db: + extends: + file: docker-compose-base.yaml + service: order-db + order-mongoexpress: + image: mongo-express + ports: + - 8081:8081 + depends_on: + - order-db + environment: + ME_CONFIG_MONGODB_URL: mongodb://order-db:27017 + order-dapr: + extends: + file: docker-compose-base.yaml + service: order-dapr + volumes: + - "./.dapr/dapr-config-minimal.yaml:/config.yaml" + - "./.dapr/components:/components" + placement: + image: "daprio/dapr" + command: ["./placement", "-port", "50006"] + ports: + - 50006:50006 +volumes: + order-db-data: diff --git a/dev-dockerfile b/dev-dockerfile new file mode 100644 index 0000000..f730629 --- /dev/null +++ b/dev-dockerfile @@ -0,0 +1,8 @@ +FROM rust:1.75-slim-bookworm + +RUN apt update && apt install -y pkg-config libssl-dev && rm -rf /var/lib/apt/lists/* + +WORKDIR /usr/src/misarch-order + +COPY . . +CMD ["cargo", "run"] diff --git a/devcontainer-dockerfile b/devcontainer-dockerfile new file mode 100644 index 0000000..35ac330 --- /dev/null +++ b/devcontainer-dockerfile @@ -0,0 +1,5 @@ +FROM rust:1.75-slim-bookworm + +RUN apt update && apt install -y pkg-config libssl-dev && rm -rf /var/lib/apt/lists/* + +WORKDIR /usr/src/misarch-order \ No newline at end of file diff --git a/docker-compose-base.yaml b/docker-compose-base.yaml new file mode 100644 index 0000000..86ede15 --- /dev/null +++ b/docker-compose-base.yaml @@ -0,0 +1,49 @@ +services: + order: + restart: unless-stopped + build: + context: . + dockerfile: base-dockerfile + healthcheck: + test: wget http://localhost:8080/graphiql || exit 1 + interval: 1s + timeout: 10s + retries: 20 + start_period: 3s + depends_on: + order-db: + condition: service_healthy + environment: + MONGODB_URI: mongodb://order-db:27017 + order-db: + image: mongo + volumes: + - order-db-data:/data/db + healthcheck: + test: echo 'db.runCommand("ping").ok' | mongosh localhost:27017/test --quiet + interval: 10s + timeout: 5s + retries: 3 + order-dapr: + image: "daprio/daprd:edge" + command: + [ + "./daprd", + "--app-id", + "order", + "--app-port", + "8080", + "--app-protocol", + "http", + "--dapr-http-port", + "3500", + "-placement-host-address", + "placement:50006", + "--config", + "/config.yaml", + "--resources-path", + "/components", + ] + network_mode: "service:order" +volumes: + order-db-data: diff --git a/docker-compose-dev.yaml b/docker-compose-dev.yaml new file mode 100644 index 0000000..daed50f --- /dev/null +++ b/docker-compose-dev.yaml @@ -0,0 +1,36 @@ +services: + order: + extends: + file: docker-compose-base.yaml + service: order + build: + context: . + dockerfile: dev-dockerfile + ports: + - 8080:8080 + order-db: + extends: + file: docker-compose-base.yaml + service: order-db + order-mongoexpress: + image: mongo-express + ports: + - 8081:8081 + depends_on: + - order-db + environment: + ME_CONFIG_MONGODB_URL: mongodb://order-db:27017 + order-dapr: + extends: + file: docker-compose-base.yaml + service: order-dapr + volumes: + - "./.dapr/dapr-config-minimal.yaml:/config.yaml" + - "./.dapr/components:/components" + placement: + image: "daprio/dapr" + command: ["./placement", "-port", "50006"] + ports: + - 50006:50006 +volumes: + order-db-data: diff --git a/queries/get_discounts.graphql b/queries/get_discounts.graphql new file mode 100644 index 0000000..5cc13e9 --- /dev/null +++ b/queries/get_discounts.graphql @@ -0,0 +1,9 @@ +query GetDiscounts($findApplicableDiscountsInput: FindApplicableDiscountsInput!) { + findApplicableDiscounts(input: $findApplicableDiscountsInput) { + productVariantId, + discounts { + id, + discount, + } + } +} \ No newline at end of file diff --git a/queries/get_shipment_fees.graphql b/queries/get_shipment_fees.graphql new file mode 100644 index 0000000..ab5920b --- /dev/null +++ b/queries/get_shipment_fees.graphql @@ -0,0 +1,3 @@ +query GetShipmentFees($calculateShipmentFeesInput: CalculateShipmentFeesInput!) { + calculateShipmentFees(input: $calculateShipmentFeesInput) +} \ No newline at end of file diff --git a/queries/get_shopping_cart_product_variant_ids_and_counts.graphql b/queries/get_shopping_cart_product_variant_ids_and_counts.graphql new file mode 100644 index 0000000..f7e8888 --- /dev/null +++ b/queries/get_shopping_cart_product_variant_ids_and_counts.graphql @@ -0,0 +1,18 @@ +query GetShoppingCartProductVariantIdsAndCounts($representations: [_Any!]!) { + _entities(representations: $representations) { + __typename + ... on User { + shoppingcart { + shoppingcartItems { + nodes { + id, + productVariant { + id + }, + count + } + } + } + } + } +} \ No newline at end of file diff --git a/queries/get_unreserved_product_item_counts.graphql b/queries/get_unreserved_product_item_counts.graphql new file mode 100644 index 0000000..f0267c7 --- /dev/null +++ b/queries/get_unreserved_product_item_counts.graphql @@ -0,0 +1,11 @@ +query GetUnreservedProductItemCounts($representations: [_Any!]!) { + _entities(representations: $representations) { + __typename + ... on ProductVariant { + id, + productItems(filter: { inventoryStatus: IN_STORAGE }) { + totalCount + } + } + } +} \ No newline at end of file diff --git a/schemas_repo b/schemas_repo new file mode 160000 index 0000000..84e8b6b --- /dev/null +++ b/schemas_repo @@ -0,0 +1 @@ +Subproject commit 84e8b6b69485c51748f40a031f7e6ae62f88fd21 diff --git a/src/authentication.rs b/src/authentication.rs new file mode 100644 index 0000000..dd8cc45 --- /dev/null +++ b/src/authentication.rs @@ -0,0 +1,83 @@ +use async_graphql::{Context, Error, Result}; +use axum::http::HeaderMap; +use bson::Uuid; +use serde::{Deserialize, Serialize}; + +// Authorized-User HTTP header. +#[derive(Serialize, Deserialize, Debug)] +pub struct AuthorizedUserHeader { + id: Uuid, + roles: Vec, +} + +// Extraction of AuthorizedUserHeader from HeaderMap. +impl TryFrom<&HeaderMap> for AuthorizedUserHeader { + type Error = Error; + + // Tries to extract the AuthorizedUserHeader from a HeaderMap. + // + // Returns a GraphQL Error if the extraction fails. + fn try_from(header_map: &HeaderMap) -> Result { + if let Some(authenticate_user_header_value) = header_map.get("Authorized-User") { + if let Ok(authenticate_user_header_str) = authenticate_user_header_value.to_str() { + let authenticate_user_header: AuthorizedUserHeader = + serde_json::from_str(authenticate_user_header_str)?; + return Ok(authenticate_user_header); + } + } + Err(Error::new( + "Authentication failed. Authorized-User header is not set or could not be parsed.", + )) + } +} + +// Role of user. +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Copy)] +#[serde(rename_all = "snake_case")] +enum Role { + Buyer, + Admin, + Employee, +} + +impl Role { + // Defines if user has a permissive role. + fn is_permissive(self) -> bool { + match self { + Self::Buyer => false, + Self::Admin => true, + Self::Employee => true, + } + } +} + +// Authenticate user of UUID for a Context. +pub fn authenticate_user(ctx: &Context, id: Uuid) -> Result<()> { + match ctx.data::() { + Ok(authenticate_user_header) => check_permissions(&authenticate_user_header, id), + Err(_) => Err(Error::new( + "Authentication failed. Authorized-User header is not set or could not be parsed.", + )), + } +} + +// Check if user of UUID has a valid permission according to the AuthorizedUserHeader. +// +// Permission is valid if the user has `Role::Buyer` and the same UUID as provided in the function parameter. +// Permission is valid if the user has a permissive role: `user.is_permissive() == true`, regardless of the users UUID. +pub fn check_permissions(authenticate_user_header: &AuthorizedUserHeader, id: Uuid) -> Result<()> { + if authenticate_user_header + .roles + .iter() + .any(|r| r.is_permissive()) + || authenticate_user_header.id == id + { + return Ok(()); + } else { + let message = format!( + "Authentication failed for user of UUID: `{}`. Operation not permitted.", + authenticate_user_header.id + ); + return Err(Error::new(message)); + } +} diff --git a/src/base_connection.rs b/src/base_connection.rs new file mode 100644 index 0000000..4498d4d --- /dev/null +++ b/src/base_connection.rs @@ -0,0 +1,37 @@ +use async_graphql::{OutputType, SimpleObject}; + +/// A base connection for an OutputType. +#[derive(SimpleObject)] +#[graphql(shareable)] +pub struct BaseConnection { + /// The resulting entities. + pub nodes: Vec, + /// Whether this connection has a next page. + pub has_next_page: bool, + /// The total amount of items in this connection. + pub total_count: u64, +} + +use mongodb_cursor_pagination::FindResult; + +pub struct FindResultWrapper(pub FindResult); + +/// Object that writes total count of items in a query, regardless of pagination. +#[derive(SimpleObject)] +pub struct AdditionalFields { + total_count: u64, +} + +/// Implementation of conversion from MongoDB pagination to GraphQL Connection. +impl From> for BaseConnection +where + Node: OutputType, +{ + fn from(value: FindResultWrapper) -> Self { + BaseConnection { + nodes: value.0.items, + has_next_page: value.0.page_info.has_next_page, + total_count: value.0.total_count, + } + } +} diff --git a/src/discount_connection.rs b/src/discount_connection.rs new file mode 100644 index 0000000..b54fc94 --- /dev/null +++ b/src/discount_connection.rs @@ -0,0 +1,28 @@ +use async_graphql::SimpleObject; + +use crate::{base_connection::BaseConnection, foreign_types::Discount}; + +/// A connection of Discounts. +#[derive(SimpleObject)] +#[graphql(shareable)] +pub struct DiscountConnection { + /// The resulting entities. + pub nodes: Vec, + /// Whether this connection has a next page. + pub has_next_page: bool, + /// The total amount of items in this connection. + pub total_count: u64, +} + +/// Implementation of conversion from BaseConnection to DiscountConnection. +/// +/// Prevents GraphQL naming conflicts. +impl From> for DiscountConnection { + fn from(value: BaseConnection) -> Self { + Self { + nodes: value.nodes, + has_next_page: value.has_next_page, + total_count: value.total_count, + } + } +} diff --git a/src/foreign_types.rs b/src/foreign_types.rs new file mode 100644 index 0000000..04869d0 --- /dev/null +++ b/src/foreign_types.rs @@ -0,0 +1,340 @@ +use async_graphql::SimpleObject; +use bson::{doc, Bson, Uuid}; +use serde::{Deserialize, Serialize}; +use std::{cmp::Ordering, hash::Hash}; + +use crate::http_event_service::{ProductVariantVersionEventData, TaxRateVersionEventData}; +use crate::mutation::get_discounts; + +/// Foreign type of a product variant. +#[derive(Debug, Serialize, Deserialize, Hash, Eq, PartialEq, Copy, Clone, SimpleObject)] +#[graphql(unresolvable = "id")] +pub struct ProductVariant { + /// UUID of the product variant. + pub _id: Uuid, + /// Current version of product variant. + #[graphql(skip)] + pub current_version: ProductVariantVersion, + /// Defines visibility of product variant. + #[graphql(skip)] + pub is_publicly_visible: bool, +} + +impl From for ProductVariant { + fn from(value: ProductVariantVersionEventData) -> Self { + Self { + _id: value.product_variant_id, + current_version: ProductVariantVersion::from(value), + is_publicly_visible: true, + } + } +} + +impl PartialOrd for ProductVariant { + fn partial_cmp(&self, other: &Self) -> Option { + self._id.partial_cmp(&other._id) + } +} + +impl From for Uuid { + fn from(value: ProductVariant) -> Self { + value._id + } +} + +/// Foreign type of a product variant. +#[derive(Debug, Serialize, Deserialize, Hash, Eq, PartialEq, Copy, Clone, SimpleObject)] +#[graphql(unresolvable = "id")] +pub struct ProductVariantVersion { + /// UUID of the product variant version. + pub _id: Uuid, + /// Price of the product variant version. + #[graphql(skip)] + pub price: u32, + /// UUID of tax rate associated with order item. + #[graphql(skip)] + pub tax_rate_id: Uuid, +} + +impl From for ProductVariantVersion { + fn from(value: ProductVariantVersionEventData) -> Self { + Self { + _id: value.id, + price: value.retail_price, + tax_rate_id: value.tax_rate_id, + } + } +} + +impl PartialOrd for ProductVariantVersion { + fn partial_cmp(&self, other: &Self) -> Option { + self._id.partial_cmp(&other._id) + } +} + +impl From for Bson { + fn from(value: ProductVariantVersion) -> Self { + Bson::Document( + doc!("_id": value._id, "price": value.price, "tax_rate_id": value.tax_rate_id), + ) + } +} + +impl From for Uuid { + fn from(value: ProductVariantVersion) -> Self { + value._id + } +} + +/// Foreign type of a product item. +#[derive(Debug, Serialize, Deserialize, Hash, Eq, PartialEq, Copy, Clone, SimpleObject)] +#[graphql(unresolvable)] +pub struct ProductItem { + /// UUID of the product item. + pub _id: Uuid, +} + +impl PartialOrd for ProductItem { + fn partial_cmp(&self, other: &Self) -> Option { + self._id.partial_cmp(&other._id) + } +} + +impl From for Bson { + fn from(value: ProductItem) -> Self { + Bson::Document(doc!("_id": value._id)) + } +} + +impl From for Uuid { + fn from(value: ProductItem) -> Self { + value._id + } +} + +/// Foreign type of a coupon. +#[derive(Debug, Serialize, Deserialize, Hash, Eq, PartialEq, Copy, Clone, SimpleObject)] +#[graphql(unresolvable)] +pub struct Coupon { + /// UUID of the coupon. + pub _id: Uuid, +} + +impl PartialOrd for Coupon { + fn partial_cmp(&self, other: &Self) -> Option { + self._id.partial_cmp(&other._id) + } +} + +impl From for Bson { + fn from(value: Coupon) -> Self { + Bson::Document(doc!("_id": value._id)) + } +} + +impl From for Uuid { + fn from(value: Coupon) -> Self { + value._id + } +} + +impl From for Coupon { + fn from(value: Uuid) -> Self { + Coupon { _id: value } + } +} + +/// Foreign type of a tax rate. +#[derive(Debug, Serialize, Deserialize, Copy, Clone, SimpleObject)] +#[graphql(unresolvable = "id")] +pub struct TaxRate { + /// UUID of the tax rate. + pub _id: Uuid, + /// Current version of tax rate. + #[graphql(skip)] + pub current_version: TaxRateVersion, +} + +impl From for TaxRate { + fn from(value: TaxRateVersionEventData) -> Self { + Self { + _id: value.tax_rate_id, + current_version: TaxRateVersion::from(value), + } + } +} + +impl From for Bson { + fn from(value: TaxRate) -> Self { + let current_version_bson = Bson::from(value.current_version); + Bson::Document(doc!("_id": value._id, "current_version": current_version_bson)) + } +} + +impl From for Uuid { + fn from(value: TaxRate) -> Self { + value._id + } +} + +/// Foreign type of a tax rate version. +#[derive(Debug, Serialize, Deserialize, Copy, Clone, SimpleObject)] +#[graphql(unresolvable = "id")] +pub struct TaxRateVersion { + /// UUID of the tax rate. + pub _id: Uuid, + /// Rate of the tax rate version. + #[graphql(skip)] + pub rate: f64, + /// Version number of product variant version. + #[graphql(skip)] + pub version: u32, +} + +impl From for TaxRateVersion { + fn from(value: TaxRateVersionEventData) -> Self { + Self { + _id: value.id, + rate: value.rate, + version: value.version, + } + } +} + +impl From for Bson { + fn from(value: TaxRateVersion) -> Self { + Bson::Document(doc!("_id": value._id, "rate": value.rate, "version": value.version)) + } +} + +impl PartialEq for TaxRateVersion { + fn eq(&self, other: &Self) -> bool { + self._id == other._id + } +} + +impl Eq for TaxRateVersion {} + +/// Foreign type of a discount. +#[derive(Debug, Serialize, Deserialize, Copy, Clone, SimpleObject)] +#[graphql(unresolvable = "id")] +pub struct Discount { + /// UUID of the discount. + pub _id: Uuid, + /// Amount to be discounted. + #[graphql(skip)] + pub discount: f64, +} + +impl Ord for Discount { + fn cmp(&self, other: &Self) -> Ordering { + self._id.cmp(&other._id) + } +} + +impl PartialOrd for Discount { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl From for Bson { + fn from(value: Discount) -> Self { + Bson::Document(doc!("_id": value._id)) + } +} + +impl From for Uuid { + fn from(value: Discount) -> Self { + value._id + } +} + +impl PartialEq for Discount { + fn eq(&self, other: &Self) -> bool { + self._id == other._id + } +} + +impl Eq for Discount {} + +impl From for Discount { + fn from(value: get_discounts::GetDiscountsFindApplicableDiscountsDiscounts) -> Self { + Self { + _id: value.id, + discount: value.discount, + } + } +} + +/// Foreign type of a shopping cart item. +#[derive(Debug, Serialize, Deserialize, Hash, Eq, PartialEq, Copy, Clone, SimpleObject)] +#[graphql(unresolvable)] +pub struct ShoppingCartItem { + /// UUID of the shopping cart item. + pub _id: Uuid, +} + +/// Foreign type of an user address. +#[derive(Debug, Serialize, Deserialize, Hash, Eq, PartialEq, Copy, Clone, SimpleObject)] +#[graphql(unresolvable)] +pub struct UserAddress { + /// UUID of the product item. + pub _id: Uuid, +} + +impl PartialOrd for UserAddress { + fn partial_cmp(&self, other: &Self) -> Option { + self._id.partial_cmp(&other._id) + } +} + +impl From for Bson { + fn from(value: UserAddress) -> Self { + Bson::Document(doc!("_id": value._id)) + } +} + +impl From for Uuid { + fn from(value: UserAddress) -> Self { + value._id + } +} + +impl From for UserAddress { + fn from(value: Uuid) -> Self { + UserAddress { _id: value } + } +} + +/// Describes the method/provider that the shipment uses. +#[derive(Debug, Serialize, Deserialize, Hash, Eq, PartialEq, Copy, Clone, SimpleObject)] +#[graphql(unresolvable)] +pub struct ShipmentMethod { + /// UUID of the shipment method. + pub _id: Uuid, +} + +impl PartialOrd for ShipmentMethod { + fn partial_cmp(&self, other: &Self) -> Option { + self._id.partial_cmp(&other._id) + } +} + +impl From for Bson { + fn from(value: ShipmentMethod) -> Self { + Bson::Document(doc!("_id": value._id)) + } +} + +impl From for Uuid { + fn from(value: ShipmentMethod) -> Self { + value._id + } +} + +impl From for ShipmentMethod { + fn from(value: Uuid) -> Self { + ShipmentMethod { _id: value } + } +} diff --git a/src/http_event_service.rs b/src/http_event_service.rs new file mode 100644 index 0000000..d99f5d2 --- /dev/null +++ b/src/http_event_service.rs @@ -0,0 +1,460 @@ +use axum::{debug_handler, extract::State, http::StatusCode, Json}; +use bson::{doc, Uuid}; +use log::info; +use mongodb::{options::UpdateOptions, Collection}; +use serde::{Deserialize, Serialize}; + +use crate::{ + foreign_types::{Coupon, ProductVariant, ProductVariantVersion, ShipmentMethod, TaxRate}, + order::Order, + order_compensation::{compensate_order, OrderCompensation}, + query::query_object, + user::User, +}; + +/// Data to send to Dapr in order to describe a subscription. +#[derive(Serialize)] +pub struct Pubsub { + #[serde(rename(serialize = "pubsubName"))] + pub pubsubname: String, + pub topic: String, + pub route: String, +} + +/// Reponse data to send to Dapr when receiving an event. +#[derive(Serialize)] +pub struct TopicEventResponse { + pub status: u8, +} + +/// Default status is `0` -> Ok, according to Dapr specs. +impl Default for TopicEventResponse { + fn default() -> Self { + Self { status: 0 } + } +} + +/// Relevant part of Dapr event wrapped in a CloudEnvelope. +#[derive(Deserialize, Debug)] +pub struct Event { + pub topic: String, + pub data: T, +} + +/// Event data containing a Uuid. +#[derive(Deserialize, Debug)] +pub struct UuidEventData { + pub id: Uuid, +} + +/// Event data containing a ProductVariantVersion. +/// +/// Differs from ProductVariantVersion in the `id` field naming. +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ProductVariantVersionEventData { + /// UUID of product variant version. + pub id: Uuid, + /// Price of product variant version. + pub retail_price: u32, + /// UUID of tax rate associated with order item. + pub tax_rate_id: Uuid, + /// UUID of product variant associated with product variant version. + pub product_variant_id: Uuid, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct TaxRateVersionEventData { + /// UUID of the tax rate version. + pub id: Uuid, + /// Rate of the tax rate version. + pub rate: f64, + /// Version number of tax rate. + pub version: u32, + /// UUID of tax rate associated with order item. + pub tax_rate_id: Uuid, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct UserAddressEventData { + /// UUID of the user address. + pub id: Uuid, + /// UUID of user of user address. + pub user_id: Uuid, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ShipmentFailedEventData { + /// UUID of the order of shipment. + pub order_id: Uuid, + /// UUIDs of the order items of shipment. + pub order_item_ids: Vec, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ShipmentStatusUpdatedEventData { + /// UUID of the order of shipment. + pub order_id: Uuid, + /// UUIDs of the order items of shipment. + pub order_item_ids: Vec, + /// Status of shipment. + pub status: ShipmentStatus, +} + +#[derive(Deserialize, Debug)] +pub enum ShipmentStatus { + Pending, + InProgress, + Delivered, + Failed, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct UpdateProductVariantEventData { + /// UUID of the product variant to update. + pub id: Uuid, + /// New visibility of product variant to update. + pub is_publicly_visible: String, +} + +/// Service state containing database connections. +#[derive(Clone)] +pub struct HttpEventServiceState { + pub product_variant_collection: Collection, + pub coupon_collection: Collection, + pub tax_rate_collection: Collection, + pub shipment_method_collection: Collection, + pub user_collection: Collection, + pub order_collection: Collection, + pub order_compensation_collection: Collection, +} + +/// HTTP endpoint to list topic subsciptions. +pub async fn list_topic_subscriptions() -> Result>, StatusCode> { + let pubsub_product_variant_version = Pubsub { + pubsubname: "pubsub".to_string(), + topic: "catalog/product-variant-version/created".to_string(), + route: "/on-product-variant-version-creation-event".to_string(), + }; + let pubsub_coupon = Pubsub { + pubsubname: "pubsub".to_string(), + topic: "discount/coupon/created".to_string(), + route: "/on-id-creation-event".to_string(), + }; + let pubsub_tax_rate_version = Pubsub { + pubsubname: "pubsub".to_string(), + topic: "tax/tax-rate-version/created".to_string(), + route: "/on-tax-rate-version-creation-event".to_string(), + }; + let pubsub_shipment_method = Pubsub { + pubsubname: "pubsub".to_string(), + topic: "shipment/shipment-method/created".to_string(), + route: "/on-id-creation-event".to_string(), + }; + let pubsub_user = Pubsub { + pubsubname: "pubsub".to_string(), + topic: "user/user/created".to_string(), + route: "/on-id-creation-event".to_string(), + }; + let pubsub_user_address = Pubsub { + pubsubname: "pubsub".to_string(), + topic: "address/user-address/created".to_string(), + route: "/on-user-address-creation-event".to_string(), + }; + Ok(Json(vec![ + pubsub_product_variant_version, + pubsub_coupon, + pubsub_tax_rate_version, + pubsub_shipment_method, + pubsub_user, + pubsub_user_address, + ])) +} + +/// HTTP endpoint to receive UUID creation events. +/// +/// Includes all creation events that consist of only UUIDs: +/// - Coupon +/// - ShipmentMethod +/// - User +#[debug_handler(state = HttpEventServiceState)] +pub async fn on_id_creation_event( + State(state): State, + Json(event): Json>, +) -> Result, StatusCode> { + info!("{:?}", event); + + match event.topic.as_str() { + "discount/coupon/created" => { + create_in_mongodb(&state.coupon_collection, event.data.id).await? + } + "shipment/shipment-method/created" => { + create_in_mongodb(&state.shipment_method_collection, event.data.id).await? + } + "user/user/created" => create_in_mongodb(&state.user_collection, event.data.id).await?, + _ => return Err(StatusCode::INTERNAL_SERVER_ERROR), + } + Ok(Json(TopicEventResponse::default())) +} + +/// HTTP endpoint to receive ProductVariantVersion creation events. +#[debug_handler(state = HttpEventServiceState)] +pub async fn on_product_variant_version_creation_event( + State(state): State, + Json(event): Json>, +) -> Result, StatusCode> { + info!("{:?}", event); + match event.topic.as_str() { + "catalog/product-variant-version/created" => { + create_or_update_product_variant_in_mongodb( + &state.product_variant_collection, + event.data, + ) + .await?; + } + _ => return Err(StatusCode::INTERNAL_SERVER_ERROR), + } + Ok(Json(TopicEventResponse::default())) +} + +/// HTTP endpoint to receive product variant update events. +#[debug_handler(state = HttpEventServiceState)] +pub async fn on_product_variant_update_event( + State(state): State, + Json(event): Json>, +) -> Result, StatusCode> { + info!("{:?}", event); + + match event.topic.as_str() { + "catalog/product-variant/updated" => { + update_product_variant_visibility_in_mongodb( + &state.product_variant_collection, + event.data, + ) + .await? + } + _ => return Err(StatusCode::INTERNAL_SERVER_ERROR), + } + Ok(Json(TopicEventResponse::default())) +} + +/// HTTP endpoint to receive TaxRateVersion creation events. +#[debug_handler(state = HttpEventServiceState)] +pub async fn on_tax_rate_version_creation_event( + State(state): State, + Json(event): Json>, +) -> Result, StatusCode> { + info!("{:?}", event); + + let tax_rate = TaxRate::from(event.data); + match event.topic.as_str() { + "tax/tax-rate-version/created" => { + create_or_update_tax_rate_in_mongodb(&state.tax_rate_collection, tax_rate).await? + } + _ => return Err(StatusCode::INTERNAL_SERVER_ERROR), + } + Ok(Json(TopicEventResponse::default())) +} + +/// HTTP endpoint to receive user Address creation events. +#[debug_handler(state = HttpEventServiceState)] +pub async fn on_user_address_creation_event( + State(state): State, + Json(event): Json>, +) -> Result, StatusCode> { + info!("{:?}", event); + + match event.topic.as_str() { + "address/user-address/created" => { + insert_user_address_in_mongodb(&state.user_collection, event.data).await? + } + _ => return Err(StatusCode::INTERNAL_SERVER_ERROR), + } + Ok(Json(TopicEventResponse::default())) +} + +/// HTTP endpoint to receive user Address archive events. +#[debug_handler(state = HttpEventServiceState)] +pub async fn on_user_address_archived_event( + State(state): State, + Json(event): Json>, +) -> Result, StatusCode> { + info!("{:?}", event); + + match event.topic.as_str() { + "address/user-address/archived" => { + remove_user_address_in_mongodb(&state.user_collection, event.data).await? + } + _ => return Err(StatusCode::INTERNAL_SERVER_ERROR), + } + Ok(Json(TopicEventResponse::default())) +} + +/// HTTP endpoint to receive Shipment creation events. +#[debug_handler(state = HttpEventServiceState)] +pub async fn on_shipment_creation_failed_event( + State(state): State, + Json(event): Json>, +) -> Result, StatusCode> { + info!("{:?}", event); + + match event.topic.as_str() { + "shipment/shipment/creation-failed" => compensate_order( + &state.order_collection, + &state.order_compensation_collection, + event.data, + ) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?, + _ => return Err(StatusCode::INTERNAL_SERVER_ERROR), + } + Ok(Json(TopicEventResponse::default())) +} + +/// Create or update ProductVariant in MongoDB. +pub async fn create_or_update_product_variant_in_mongodb( + collection: &Collection, + product_variant_version_event_data: ProductVariantVersionEventData, +) -> Result<(), StatusCode> { + match query_object( + collection, + product_variant_version_event_data.product_variant_id, + ) + .await + { + Ok(product_variant) => { + update_product_variant_in_mongodb( + product_variant_version_event_data, + collection, + product_variant, + ) + .await + } + Err(e) => { + log::info!("Error {:?}", e); + create_product_variant_in_mongodb(product_variant_version_event_data, collection).await + } + } +} + +/// Update ProductVariant in MongoDB. +async fn update_product_variant_in_mongodb( + product_variant_version_event_data: ProductVariantVersionEventData, + collection: &Collection, + product_variant: ProductVariant, +) -> Result<(), StatusCode> { + let product_variant_version = ProductVariantVersion::from(product_variant_version_event_data); + log::info!("{:?}", product_variant_version); + match collection + .update_one( + doc! {"_id": product_variant._id}, + doc! {"$set": {"current_version": product_variant_version}}, + None, + ) + .await + { + Ok(_) => Ok(()), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// Create ProductVariant in MongoDB. +async fn create_product_variant_in_mongodb( + product_variant_version_event_data: ProductVariantVersionEventData, + collection: &Collection, +) -> Result<(), StatusCode> { + let product_variant = ProductVariant::from(product_variant_version_event_data); + match collection.insert_one(product_variant, None).await { + Ok(_) => Ok(()), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// Create or update TaxRate in MongoDB. +pub async fn create_or_update_tax_rate_in_mongodb( + collection: &Collection, + tax_rate: TaxRate, +) -> Result<(), StatusCode> { + let update_options = UpdateOptions::builder().upsert(true).build(); + match collection + .update_one( + doc! {"_id": tax_rate._id }, + doc! {"$set": {"_id": tax_rate._id, "current_version": tax_rate.current_version}}, + update_options, + ) + .await + { + Ok(_) => Ok(()), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// Inserts user Address in MongoDB. +pub async fn insert_user_address_in_mongodb( + collection: &Collection, + user_address_event_data: UserAddressEventData, +) -> Result<(), StatusCode> { + match collection + .update_one( + doc! {"_id": user_address_event_data.user_id }, + doc! {"$push": {"user_address_ids": user_address_event_data.id }}, + None, + ) + .await + { + Ok(_) => Ok(()), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// Remove user Address in MongoDB. +pub async fn remove_user_address_in_mongodb( + collection: &Collection, + user_address_event_data: UserAddressEventData, +) -> Result<(), StatusCode> { + match collection + .update_one( + doc! {"_id": user_address_event_data.user_id }, + doc! {"$pull": {"user_address_ids": user_address_event_data.id }}, + None, + ) + .await + { + Ok(_) => Ok(()), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +async fn update_product_variant_visibility_in_mongodb( + collection: &Collection, + update_product_variant_event_data: UpdateProductVariantEventData, +) -> Result<(), StatusCode> { + match collection + .update_one( + doc! {"_id": update_product_variant_event_data.id }, + doc! {"$set": {"is_publicly_visible": update_product_variant_event_data.is_publicly_visible }}, + None, + ) + .await + { + Ok(_) => Ok(()), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// Create a new object: T in MongoDB. +pub async fn create_in_mongodb>( + collection: &Collection, + id: Uuid, +) -> Result<(), StatusCode> { + let object = T::from(id); + match collection.insert_one(object, None).await { + Ok(_) => Ok(()), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..bbb31b6 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,207 @@ +use std::{env, fs::File, io::Write}; + +use async_graphql::{ + extensions::Logger, http::GraphiQLSource, EmptySubscription, SDLExportOptions, Schema, +}; + +use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; + +use axum::{ + extract::State, + http::header::HeaderMap, + response::{self, IntoResponse}, + routing::{get, post}, + Router, Server, +}; + +use clap::{arg, command, Parser}; + +use order_compensation::OrderCompensation; +use simple_logger::SimpleLogger; + +use log::info; +use mongodb::{options::ClientOptions, Client, Database}; + +mod order; +use order::Order; + +mod order_item; + +mod query; +use query::Query; + +mod mutation; +use mutation::Mutation; + +use foreign_types::{Coupon, ProductVariant, ShipmentMethod, TaxRate}; + +mod user; +use user::User; + +mod http_event_service; +use http_event_service::{ + list_topic_subscriptions, on_id_creation_event, on_product_variant_update_event, + on_product_variant_version_creation_event, on_shipment_creation_failed_event, + on_tax_rate_version_creation_event, on_user_address_archived_event, + on_user_address_creation_event, HttpEventServiceState, +}; + +mod authentication; +use authentication::AuthorizedUserHeader; + +mod order_compensation; + +mod base_connection; +mod discount_connection; +mod foreign_types; +mod mutation_input_structs; +mod order_connection; +mod order_datatypes; +mod order_item_connection; +mod product_variant_version_connection; + +/// Builds the GraphiQL frontend. +async fn graphiql() -> impl IntoResponse { + response::Html(GraphiQLSource::build().endpoint("/").finish()) +} + +/// Establishes database connection and returns the client. +async fn db_connection() -> Client { + let uri = match env::var_os("MONGODB_URI") { + Some(uri) => uri.into_string().unwrap(), + None => panic!("$MONGODB_URI is not set."), + }; + + // Parse a connection string into an options struct. + let mut client_options = ClientOptions::parse(uri).await.unwrap(); + + // Manually set an option. + client_options.app_name = Some("Order".to_string()); + + // Get a handle to the deployment. + Client::with_options(client_options).unwrap() +} + +/// Returns Router that establishes connection to Dapr. +/// +/// Creates endpoints to define pub/sub interaction with Dapr. +async fn build_dapr_router(db_client: Database) -> Router { + let product_variant_collection: mongodb::Collection = + db_client.collection::("product_variants"); + let coupon_collection: mongodb::Collection = db_client.collection::("coupons"); + let tax_rate_collection: mongodb::Collection = + db_client.collection::("tax_rates"); + let shipment_method_collection: mongodb::Collection = + db_client.collection::("shipment_methods"); + let user_collection: mongodb::Collection = db_client.collection::("users"); + let order_collection: mongodb::Collection = db_client.collection::("orders"); + let order_compensation_collection: mongodb::Collection = + db_client.collection::("order_compensations"); + + // Define routes. + let app = Router::new() + .route("/dapr/subscribe", get(list_topic_subscriptions)) + .route("/on-id-creation-event", post(on_id_creation_event)) + .route( + "/on-product-variant-version-creation-event", + post(on_product_variant_version_creation_event), + ) + .route( + "/on-product-variant-update-event", + post(on_product_variant_update_event), + ) + .route( + "/on-tax-rate-version-creation-event", + post(on_tax_rate_version_creation_event), + ) + .route( + "/on-user-address-creation-event", + post(on_user_address_creation_event), + ) + .route( + "/on-user-address-archived-event", + post(on_user_address_archived_event), + ) + .route( + "/on-shipment-creation-failed-event", + post(on_shipment_creation_failed_event), + ) + .with_state(HttpEventServiceState { + product_variant_collection, + coupon_collection, + tax_rate_collection, + shipment_method_collection, + user_collection, + order_collection, + order_compensation_collection, + }); + app +} + +/// Command line argument to toggle schema generation instead of service execution. +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Generates GraphQL schema in `./schemas/order.graphql`. + #[arg(long)] + generate_schema: bool, +} + +/// Activates logger and parses argument for optional schema generation. Otherwise starts gRPC and GraphQL server. +#[tokio::main] +async fn main() -> std::io::Result<()> { + SimpleLogger::new().init().unwrap(); + + let args = Args::parse(); + if args.generate_schema { + let schema = Schema::build(Query, Mutation, EmptySubscription).finish(); + let mut file = File::create("./schemas/order.graphql")?; + let sdl_export_options = SDLExportOptions::new().federation(); + let schema_sdl = schema.sdl_with_options(sdl_export_options); + file.write_all(schema_sdl.as_bytes())?; + info!("GraphQL schema: ./schemas/order.graphql was successfully generated!"); + } else { + start_service().await; + } + Ok(()) +} + +/// Describes the handler for GraphQL requests. +/// +/// Parses the "Authenticate-User" header and writes it in the context data of the specfic request. +/// Then executes the GraphQL schema with the request. +async fn graphql_handler( + State(schema): State>, + headers: HeaderMap, + req: GraphQLRequest, +) -> GraphQLResponse { + let mut req = req.into_inner(); + if let Ok(authenticate_user_header) = AuthorizedUserHeader::try_from(&headers) { + req = req.data(authenticate_user_header); + } + schema.execute(req).await.into() +} + +/// Starts order service on port 8000. +async fn start_service() { + let client = db_connection().await; + let db_client: Database = client.database("order-database"); + + let schema = Schema::build(Query, Mutation, EmptySubscription) + .extension(Logger) + .data(db_client.clone()) + .enable_federation() + .finish(); + + let graphiql = Router::new() + .route("/", get(graphiql).post(graphql_handler)) + .with_state(schema); + let dapr_router = build_dapr_router(db_client).await; + let app = Router::new().merge(graphiql).merge(dapr_router); + + info!("GraphiQL IDE: http://0.0.0.0:8080"); + Server::bind(&"0.0.0.0:8080".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); +} diff --git a/src/mutation.rs b/src/mutation.rs new file mode 100644 index 0000000..d5af367 --- /dev/null +++ b/src/mutation.rs @@ -0,0 +1,1034 @@ +use async_graphql::{Context, Error, Object, Result}; +use bson::Bson; +use bson::Uuid; +use futures::TryStreamExt; +use graphql_client::GraphQLQuery; +use graphql_client::Response; +use mongodb::{ + bson::{doc, DateTime}, + Collection, Database, +}; +use serde::Deserialize; +use serde::Serialize; +use std::any::type_name; +use std::collections::BTreeSet; +use std::collections::HashMap; +use std::time::Duration; +use std::time::SystemTime; + +use crate::authentication::authenticate_user; +use crate::authentication::AuthorizedUserHeader; +use crate::foreign_types::Coupon; +use crate::foreign_types::Discount; +use crate::foreign_types::ProductVariant; +use crate::foreign_types::ProductVariantVersion; +use crate::foreign_types::ShipmentMethod; +use crate::foreign_types::TaxRate; +use crate::foreign_types::TaxRateVersion; +use crate::foreign_types::UserAddress; +use crate::mutation_input_structs::CreateOrderInput; +use crate::mutation_input_structs::OrderItemInput; +use crate::order::OrderDTO; +use crate::order::OrderStatus; +use crate::order_item::OrderItem; +use crate::query::query_object; +use crate::query::query_objects; +use crate::user::User; +use crate::{order::Order, query::query_order}; + +use self::get_shipment_fees::CalculateShipmentFeesInput; + +const PENDING_TIMEOUT: Duration = Duration::new(3600, 0); + +/// Describes GraphQL order mutations. +pub struct Mutation; + +#[Object] +impl Mutation { + /// Creates an order with `OrderStatus::Pending`. + async fn create_order<'a>( + &self, + ctx: &Context<'a>, + #[graphql(desc = "CreateOrderInput")] input: CreateOrderInput, + ) -> Result { + authenticate_user(&ctx, input.user_id)?; + let db_client = ctx.data::()?; + let collection: Collection = db_client.collection::("orders"); + validate_order_input(db_client, &input).await?; + let current_timestamp = DateTime::now(); + let internal_order_items: Vec = + create_internal_order_items(&ctx, &input, current_timestamp).await?; + let shipment_address = UserAddress::from(input.shipment_address_id); + let invoice_address = UserAddress::from(input.invoice_address_id); + let compensatable_order_amount = + calculate_compensatable_order_amount(&internal_order_items); + let order = Order { + _id: Uuid::new(), + user: User::from(input.user_id), + created_at: current_timestamp, + order_status: OrderStatus::Pending, + placed_at: None, + rejection_reason: None, + internal_order_items, + shipment_address, + invoice_address, + compensatable_order_amount, + payment_information_id: input.payment_information_id, + }; + match collection.insert_one(order, None).await { + Ok(result) => { + let id = uuid_from_bson(result.inserted_id)?; + query_order(&collection, id).await + } + Err(_) => Err(Error::new("Adding order failed in MongoDB.")), + } + } + + /// Places an existing order by changing its status to `OrderStatus::Placed`. + async fn place_order<'a>( + &self, + ctx: &Context<'a>, + #[graphql(desc = "Uuid of order to place")] id: Uuid, + ) -> Result { + let db_client = ctx.data::()?; + let collection: Collection = db_client.collection::("orders"); + let mut order = query_order(&collection, id).await?; + authenticate_user(&ctx, order.user._id)?; + set_status_placed(&collection, id).await?; + order = query_order(&collection, id).await?; + send_order_created_event(order.clone()).await?; + Ok(order) + } +} + +/// Calculates the total compensatable amount of all order items in the input by summing up their `compensatable_amount` attributes. +fn calculate_compensatable_order_amount(order_items: &Vec) -> u64 { + order_items.iter().map(|o| o.compensatable_amount).sum() +} + +/// Extracts UUID from Bson. +/// +/// Creating a order returns a UUID in a Bson document. This function helps to extract the UUID. +fn uuid_from_bson(bson: Bson) -> Result { + match bson { + Bson::Binary(id) => Ok(id.to_uuid()?), + _ => { + let message = format!( + "Returned id: `{}` needs to be a Binary in order to be parsed as a Uuid", + bson + ); + Err(Error::new(message)) + } + } +} + +/// Sets the status of an order to `OrderStatus::Placed`. +/// Checks if pending order is still valid before setting `OrderStatus::Placed`. +/// Rejects order if timestamp of placement exceeds `PENDING_TIMEOUT` in relation to the order creation timestamp. +/// +/// * `collection` - MongoDB collection to update. +/// * `input` - `UpdateOrderInput`. +async fn set_status_placed(collection: &Collection, id: Uuid) -> Result<()> { + let current_timestamp_system_time = SystemTime::now(); + let order = query_object(&collection, id).await?; + let order_created_at_system_time = order.created_at.to_system_time(); + if order_created_at_system_time + PENDING_TIMEOUT >= current_timestamp_system_time { + match order.order_status { + OrderStatus::Pending => { + let current_timestamp = DateTime::from(current_timestamp_system_time); + set_status_placed_in_mongodb(&collection, id, current_timestamp).await + } + _ => { + let message = format!("`{:?}` must be `OrderStatus::Pending` to be able to be placed. Order was already placed or rejected.", order.order_status); + Err(Error::new(message)) + } + } + } else { + set_status_rejected_in_mongodb(&collection, id).await + } +} + +/// Updates order to `OrderStatus::Placed` in MongoDB. +async fn set_status_placed_in_mongodb( + collection: &Collection, + id: Uuid, + current_timestamp: DateTime, +) -> Result<()> { + let result = collection + .update_one( + doc! {"_id": id }, + doc! {"$set": {"order_status": OrderStatus::Placed, "placed_at": current_timestamp}}, + None, + ) + .await; + if let Err(_) = result { + let message = format!("Placing order of id: `{}` failed in MongoDB.", id); + return Err(Error::new(message)); + } + Ok(()) +} + +/// Updates order to `OrderStatus::Rejected` in MongoDB. +/// +/// This function always returns an Err. +async fn set_status_rejected_in_mongodb(collection: &Collection, id: Uuid) -> Result<()> { + let result = collection + .update_one( + doc! {"_id": id }, + doc! {"$set": {"order_status": OrderStatus::Rejected}}, + None, + ) + .await; + match result { + Ok(_) => { + let message = format!( + "Order of id: `{}` was rejected as it is `OrderStatus::Pending` for too long.", + id + ); + return Err(Error::new(message)); + } + Err(_) => { + let message = format!("Order should be rejected as it is `OrderStatus::Pending` for too long. Rejecting order of id: `{}` failed in MongoDB.", id); + return Err(Error::new(message)); + } + } +} + +/// Checks if foreign types exist (MongoDB database populated with events). +async fn validate_order_input(db_client: &Database, input: &CreateOrderInput) -> Result<()> { + let user_collection: mongodb::Collection = db_client.collection::("users"); + validate_object(&user_collection, input.user_id).await?; + validate_order_items(&db_client, &input.order_item_inputs).await?; + validate_addresses(&db_client, &input).await?; + Ok(()) +} + +/// Checks if all order item parameters are the system (MongoDB database populated with events). +/// +/// Used before creating orders. +async fn validate_order_items( + db_client: &Database, + order_item_inputs: &BTreeSet, +) -> Result<()> { + let shipment_method_collection: mongodb::Collection = + db_client.collection::("shipment_methods"); + let shipment_method_ids = order_item_inputs + .iter() + .map(|o| o.shipment_method_id) + .collect(); + validate_objects(&shipment_method_collection, shipment_method_ids).await?; + validate_coupons(&db_client, &order_item_inputs).await?; + Ok(()) +} + +/// Checks if coupons are in the system (MongoDB database populated with events). +/// +/// Used before creating orders. +async fn validate_coupons( + db_client: &Database, + order_item_inputs: &BTreeSet, +) -> Result<()> { + let coupon_collection: mongodb::Collection = db_client.collection::("coupons"); + let coupon_ids: Vec = order_item_inputs + .iter() + .map(|o| o.coupon_ids.clone()) + .flatten() + .collect(); + validate_objects(&coupon_collection, coupon_ids).await +} + +/// Checks if addresses are registered under the user (MongoDB database populated with events). +/// +/// Used before creating orders. +async fn validate_addresses(db_client: &Database, input: &CreateOrderInput) -> Result<()> { + let user_collection: mongodb::Collection = db_client.collection::("users"); + validate_user_address(&user_collection, input.shipment_address_id, input.user_id).await?; + validate_user_address(&user_collection, input.invoice_address_id, input.user_id).await +} + +/// Creates OrderItems from OrderItemInputs. +/// +/// Used before creating orders. +/// Each order can only contain an order item with a specific product variant once. +async fn create_internal_order_items<'a>( + ctx: &Context<'a>, + input: &CreateOrderInput, + current_timestamp: DateTime, +) -> Result> { + let db_client = ctx.data::()?; + let authorized_header = ctx.data::()?; + let ( + counts_by_product_variant_ids, + order_item_inputs_by_product_variant_ids, + product_variants_by_product_variant_ids, + product_variant_versions_by_product_variant_ids, + tax_rate_versions_by_product_variant_ids, + discounts_by_product_variant_ids, + ) = query_or_obtain_order_item_attributes(authorized_header, input, db_client).await?; + let internal_order_items = zip_to_internal_order_items( + order_item_inputs_by_product_variant_ids, + product_variants_by_product_variant_ids, + product_variant_versions_by_product_variant_ids, + tax_rate_versions_by_product_variant_ids, + counts_by_product_variant_ids, + discounts_by_product_variant_ids, + current_timestamp, + )?; + Ok(internal_order_items) +} + +/// Queries or obtains the attributes necessary for order item construction. +async fn query_or_obtain_order_item_attributes( + authorized_header: &AuthorizedUserHeader, + input: &CreateOrderInput, + db_client: &Database, +) -> Result< + ( + HashMap, + HashMap, + HashMap, + HashMap, + HashMap, + HashMap>, + ), + Error, +> { + let (counts_by_product_variant_ids, order_item_inputs_by_product_variant_ids) = + query_counts_by_product_variant_ids(authorized_header, &input).await?; + let product_variant_ids: Vec = counts_by_product_variant_ids.keys().cloned().collect(); + let product_variants_by_product_variant_ids: HashMap = + query_product_variants_by_product_variant_ids(db_client, &product_variant_ids).await?; + let product_variant_versions_by_product_variant_ids = + query_product_variant_versions_by_product_variant_ids( + &product_variants_by_product_variant_ids, + ) + .await; + check_product_variant_availability(&product_variant_ids, &counts_by_product_variant_ids) + .await?; + let tax_rate_versions_by_product_variant_ids = query_tax_rate_versions_by_product_variant_ids( + db_client, + &product_variant_versions_by_product_variant_ids, + ) + .await?; + let discounts_by_product_variant_ids = query_discounts_by_product_variant_ids( + input.user_id, + &order_item_inputs_by_product_variant_ids, + &product_variant_ids, + &product_variant_versions_by_product_variant_ids, + &counts_by_product_variant_ids, + ) + .await?; + let _shipment_fees = query_shipment_fees( + &order_item_inputs_by_product_variant_ids, + &product_variant_versions_by_product_variant_ids, + &counts_by_product_variant_ids, + ) + .await?; + Ok(( + counts_by_product_variant_ids, + order_item_inputs_by_product_variant_ids, + product_variants_by_product_variant_ids, + product_variant_versions_by_product_variant_ids, + tax_rate_versions_by_product_variant_ids, + discounts_by_product_variant_ids, + )) +} + +/// Zips HashMaps which contain the required attributes for construction to order items. +fn zip_to_internal_order_items( + order_item_inputs_by_product_variant_ids: HashMap, + product_variants_by_product_variant_ids: HashMap, + product_variant_versions_by_product_variant_ids: HashMap, + tax_rate_versions_by_product_variant_ids: HashMap, + counts_by_product_variant_ids: HashMap, + discounts_by_product_variant_ids: HashMap>, + current_timestamp: DateTime, +) -> Result> { + product_variants_by_product_variant_ids + .iter() + .map(|(id, product_variant)| { + let order_item_input_error = + build_hash_map_error(&order_item_inputs_by_product_variant_ids, *id); + let product_variant_version_error = + build_hash_map_error(&product_variant_versions_by_product_variant_ids, *id); + let tax_rate_version_error = + build_hash_map_error(&tax_rate_versions_by_product_variant_ids, *id); + let count_error = build_hash_map_error(&counts_by_product_variant_ids, *id); + let discount_error = build_hash_map_error(&discounts_by_product_variant_ids, *id); + let order_item_input = order_item_inputs_by_product_variant_ids + .get(id) + .ok_or(order_item_input_error)?; + let product_variant_version = product_variant_versions_by_product_variant_ids + .get(id) + .ok_or(product_variant_version_error)?; + let tax_rate_version = tax_rate_versions_by_product_variant_ids + .get(id) + .ok_or(tax_rate_version_error)?; + let count = counts_by_product_variant_ids.get(id).ok_or(count_error)?; + let internal_discounts = discounts_by_product_variant_ids + .get(id) + .ok_or(discount_error)?; + let order_item = OrderItem::new( + order_item_input, + product_variant, + product_variant_version, + tax_rate_version, + *count, + internal_discounts, + current_timestamp, + ); + Ok(order_item) + }) + .collect::>>() +} + +// Defines a custom scalar from GraphQL schema. +type _Any = Representation; + +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "schemas_repo/inventory.graphql", + query_path = "queries/get_unreserved_product_item_counts.graphql", + response_derives = "Debug" +)] +/// GraphQL query generated by client library. +struct GetUnreservedProductItemCounts; +#[derive(Serialize, Debug)] + +/// Input type for a GraphQL entity resolver query. +struct Representation { + __typename: String, + id: String, +} + +/// Checks if product items are available in the inventory service. +async fn check_product_variant_availability( + product_variant_ids: &Vec, + counts_by_product_variant_ids: &HashMap, +) -> Result<()> { + let representations = product_variant_ids + .iter() + .cloned() + .map(|id| Representation { + __typename: "ProductVariant".to_string(), + id: id.to_string(), + }) + .collect(); + let variables = get_unreserved_product_item_counts::Variables { representations }; + + let request_body = GetUnreservedProductItemCounts::build_query(variables); + let client = reqwest::Client::new(); + + let res = client + .post("http://localhost:3500/v1.0/invoke/inventory/method/graphql") + .json(&request_body) + .send() + .await?; + let response_body: Response = + res.json().await?; + let response_data: get_unreserved_product_item_counts::ResponseData = + response_body.data.ok_or(Error::new( + "Response data of `check_product_variant_availability` query is empty.", + ))?; + let stock_counts_by_product_variant_ids = + build_stock_counts_by_product_variant_from_response_data(response_data)?; + calculate_availability_of_product_variant_ids( + &stock_counts_by_product_variant_ids, + &counts_by_product_variant_ids, + ) +} + +/// Remaps the result type of the GraphQL `_entities` query retrieving stock counts for product variants. +fn build_stock_counts_by_product_variant_from_response_data( + response_data: get_unreserved_product_item_counts::ResponseData, +) -> Result> { + response_data + .entities + .into_iter() + .map(|maybe_product_variant_enum| { + let message = format!("Response data of `check_product_variant_availability` query could not be parsed, `{:?}` is `None`", maybe_product_variant_enum); + let product_variant_enum = maybe_product_variant_enum.ok_or(Error::new(message))?; + let stock_counts_by_product_variant: Result<(Uuid, u64)> = match product_variant_enum { + get_unreserved_product_item_counts::GetUnreservedProductItemCountsEntities::ProductVariant(product_variant) => { + let message = format!("Response data of `check_product_variant_availability` query could not be parsed, `{:?}` is `None`", product_variant.product_items); + let product_items = product_variant.product_items.ok_or(Error::new(message))?; + let stock_count = u64::try_from(product_items.total_count)?; + Ok( + ( + product_variant.id, + stock_count + ) + ) + } + get_unreserved_product_item_counts::GetUnreservedProductItemCountsEntities::ProductItem => todo!(), + }; + stock_counts_by_product_variant + }).collect() +} + +/// Calculates the availability based on the actual and expected stock counts based on the product variant ids. +/// +/// The expected amount or more product items need to be in stock for a product variant to be counted as available. +/// All product variants need to be available for this function to pass without an Err. +fn calculate_availability_of_product_variant_ids( + stock_counts_by_product_variant_ids: &HashMap, + expected_stock_counts_by_product_variant_ids: &HashMap, +) -> Result<()> { + let availabilites: Vec = expected_stock_counts_by_product_variant_ids + .iter() + .map(|(id, expected_count)| { + let error = build_hash_map_error(expected_stock_counts_by_product_variant_ids, *id); + let count = stock_counts_by_product_variant_ids.get(id).ok_or(error)?; + Ok(*count >= *expected_count) + }) + .collect::>>()?; + match availabilites.into_iter().all(|b| b == true) { + true => Ok(()), + false => Err(Error::new( + "Not all requested product variants are available.", + )), + } +} + +// Defines a custom scalar from GraphQL schema. +type UUID = Uuid; + +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "schemas_repo/shoppingcart.graphql", + query_path = "queries/get_shopping_cart_product_variant_ids_and_counts.graphql", + response_derives = "Debug" +)] +/// GraphQL query generated by client library. +struct GetShoppingCartProductVariantIdsAndCounts; + +/// Queries product variants from shopping cart item ids from shopping cart service. +async fn query_counts_by_product_variant_ids( + authorized_user_header: &AuthorizedUserHeader, + input: &CreateOrderInput, +) -> Result<(HashMap, HashMap)> { + let representations = vec![Representation { + __typename: "User".to_string(), + id: input.user_id.to_string(), + }]; + let variables = get_shopping_cart_product_variant_ids_and_counts::Variables { representations }; + + let request_body = GetShoppingCartProductVariantIdsAndCounts::build_query(variables); + let client = reqwest::Client::new(); + + let authorized_user_header_string = serde_json::to_string(authorized_user_header)?; + let res = client + .post("http://localhost:3500/v1.0/invoke/shoppingcart/method/") + .json(&request_body) + .header("Authorized-User", authorized_user_header_string) + .send() + .await?; + let response_body: Response = + res.json().await?; + let message = "Response data of `query_counts_by_product_variant_ids` query is empty."; + let mut response_data: get_shopping_cart_product_variant_ids_and_counts::ResponseData = + response_body.data.ok_or(Error::new(message))?; + let shopping_cart_response_data = response_data.entities.remove(0).ok_or(message)?; + + let ids_and_counts_by_shopping_cart_item_ids = + into_ids_and_counts_by_shopping_cart_item_ids(shopping_cart_response_data)?; + let counts_by_product_variant_ids = build_counts_by_product_variant_ids( + &input.order_item_inputs, + &ids_and_counts_by_shopping_cart_item_ids, + )?; + let order_item_inputs_by_product_variant_ids = build_order_item_inputs_by_product_variant_ids( + &input.order_item_inputs, + &ids_and_counts_by_shopping_cart_item_ids, + )?; + Ok(( + counts_by_product_variant_ids, + order_item_inputs_by_product_variant_ids, + )) +} + +// Unwraps Enum and maps the result to a HashMap of shopping cart item ids as keys and (product_variant_id, count) as values. +fn into_ids_and_counts_by_shopping_cart_item_ids( + ids_and_counts_enum: get_shopping_cart_product_variant_ids_and_counts::GetShoppingCartProductVariantIdsAndCountsEntities, +) -> Result> { + let message = format!("`ids_and_counts_enum: get_shopping_cart_product_variant_ids_and_counts::GetShoppingCartProductVariantIdsAndCountsEntities` does not contain a `get_shopping_cart_product_variant_ids_and_counts::GetShoppingCartProductVariantIdsAndCountsEntities::User`, but is another entity: `{:?}`", ids_and_counts_enum); + match ids_and_counts_enum { + get_shopping_cart_product_variant_ids_and_counts::GetShoppingCartProductVariantIdsAndCountsEntities::User(user) => { + let ids_and_counts_by_shopping_cart_item_ids = user.shoppingcart.shoppingcart_items.nodes.iter().map(|shoppingcart_item| + (shoppingcart_item.id, (shoppingcart_item.product_variant.id, shoppingcart_item.count as u64)) + ).collect(); + Ok(ids_and_counts_by_shopping_cart_item_ids) + } + _ => Err(Error::new(message))?, + } +} + +/// Filters shopping cart items: `ids_and_counts` to map to `order_item_inputs`. +/// Builds HashMap which maps product variant ids to counts. +fn build_counts_by_product_variant_ids( + order_item_inputs: &BTreeSet, + ids_and_counts: &HashMap, +) -> Result> { + order_item_inputs + .iter() + .map(|e| { + let id_and_count_ref = ids_and_counts.get(&e.shopping_cart_item_id); + let id_and_count = id_and_count_ref.and_then(|(id, count)| Some((*id, *count))); + let error = build_hash_map_error(ids_and_counts, e.shopping_cart_item_id); + id_and_count.ok_or(error) + }) + .collect() +} + +/// Filters shopping cart items: `ids_and_counts` to map to `order_item_inputs`. +/// Builds HashMap which maps product variant ids to order item inputs. +fn build_order_item_inputs_by_product_variant_ids( + order_item_inputs: &BTreeSet, + ids_and_counts: &HashMap, +) -> Result> { + order_item_inputs + .iter() + .map(|e| { + let id_and_count_ref = ids_and_counts.get(&e.shopping_cart_item_id); + let id_and_count = id_and_count_ref.and_then(|(id, _)| Some((*id, e.clone()))); + let error = build_hash_map_error(ids_and_counts, e.shopping_cart_item_id); + id_and_count.ok_or(error) + }) + .collect() +} + +/// Obtains product variants from product variant ids. +/// +/// Filters product variants which are non-publicly-visible. +async fn query_product_variants_by_product_variant_ids( + db_client: &Database, + product_variant_ids: &Vec, +) -> Result> { + let collection: Collection = + db_client.collection::("product_variants"); + let product_variants_by_product_variant_ids_unfiltered = + query_objects(&collection, product_variant_ids).await?; + let product_variants_by_product_variant_ids = + product_variants_by_product_variant_ids_unfiltered + .into_iter() + .filter(|(_, p)| p.is_publicly_visible) + .collect(); + Ok(product_variants_by_product_variant_ids) +} + +/// Obtains current product variant versions using product variants. +async fn query_product_variant_versions_by_product_variant_ids( + product_variants_by_product_variant_ids: &HashMap, +) -> HashMap { + let product_variant_versions_by_product_variant_ids: HashMap = + product_variants_by_product_variant_ids + .iter() + .map(|(id, p)| (*id, p.current_version)) + .collect(); + product_variant_versions_by_product_variant_ids +} + +/// Obtains current tax rate version for tax rate in product variant versions. +async fn query_tax_rate_versions_by_product_variant_ids( + db_client: &Database, + product_variant_versions_by_product_variant_ids: &HashMap, +) -> Result> { + let collection: Collection = db_client.collection::("tax_rates"); + let tax_rate_ids: Vec = product_variant_versions_by_product_variant_ids + .iter() + .map(|(_, p)| p.tax_rate_id) + .collect(); + let tax_rates = query_objects(&collection, &tax_rate_ids).await?; + let tax_rate_versions_by_product_variant_ids = product_variant_versions_by_product_variant_ids + .iter() + .map(|(id, p)| { + let error = build_hash_map_error(&tax_rates, *id); + let tax_rate = tax_rates.get(&p.tax_rate_id).ok_or(error)?; + Ok((*id, tax_rate.current_version)) + }) + .collect::>>()?; + Ok(tax_rate_versions_by_product_variant_ids) +} + +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "schemas_repo/discount.graphql", + query_path = "queries/get_discounts.graphql", + response_derives = "Debug" +)] +/// GraphQL query generated by client library. +pub struct GetDiscounts; + +/// Queries discounts for coupons from discount service. +async fn query_discounts_by_product_variant_ids( + user_id: Uuid, + order_item_inputs_by_product_variant_ids: &HashMap, + product_variant_ids: &Vec, + product_variant_versions_by_product_variant_ids: &HashMap, + counts_by_product_variant_ids: &HashMap, +) -> Result>> { + let find_applicable_discounts_product_variant_input = + build_find_applicable_discounts_product_variant_input( + order_item_inputs_by_product_variant_ids, + product_variant_ids, + counts_by_product_variant_ids, + )?; + let order_amount = calculate_order_amount(&product_variant_versions_by_product_variant_ids); + let find_applicable_discounts_input = build_find_applicable_discounts_input( + user_id, + find_applicable_discounts_product_variant_input, + order_amount, + ); + let variables = get_discounts::Variables { + find_applicable_discounts_input, + }; + let request_body = GetDiscounts::build_query(variables); + let client = reqwest::Client::new(); + + let res = client + .post("http://localhost:3500/v1.0/invoke/discount/method/graphql") + .json(&request_body) + .send() + .await?; + let response_body: Response = res.json().await?; + let response_data: get_discounts::ResponseData = response_body.data.ok_or(Error::new( + "Response data of `query_discounts` query is empty.", + ))?; + build_discounts_from_response_data(response_data, product_variant_ids) +} + +/// Remaps the result type of the GraphQL `findApplicableDiscounts` query to the the according product variants. +/// Converts the GraphQL client library generated discounts to the internally used discounts, which are GraphQL `SimpleObject`. +fn build_discounts_from_response_data( + response_data: get_discounts::ResponseData, + product_variant_ids: &Vec, +) -> Result>> { + let graphql_client_lib_discounts: HashMap< + Uuid, + get_discounts::GetDiscountsFindApplicableDiscounts, + > = remap_discounts_to_product_variants( + response_data.find_applicable_discounts, + &product_variant_ids, + )?; + let simple_object_discounts = convert_graphql_client_lib_discounts_to_simple_object_discounts( + graphql_client_lib_discounts, + ); + Ok(simple_object_discounts) +} + +/// Builds `get_discounts::FindApplicableDiscountsInput`, which is the following struct: +/// +/// pub struct FindApplicableDiscountsInput { +/// #[serde(rename = "orderAmount")] +/// pub order_amount: Int, +/// #[serde(rename = "productVariants")] +/// pub product_variants: Vec, +/// #[serde(rename = "userId")] +/// pub user_id: UUID, +/// } +/// +/// Describes the order amount, which is the sum of all product variant version prices, a Vec of `get_discounts::FindApplicableDiscountsProductVariantInput` and the user which the discounts are be queried for. +fn build_find_applicable_discounts_input( + user_id: Uuid, + find_applicable_discounts_product_variant_input: Vec< + get_discounts::FindApplicableDiscountsProductVariantInput, + >, + order_amount: i64, +) -> get_discounts::FindApplicableDiscountsInput { + let find_applicable_discounts_input = get_discounts::FindApplicableDiscountsInput { + user_id, + product_variants: find_applicable_discounts_product_variant_input, + order_amount, + }; + find_applicable_discounts_input +} + +/// Builds part of the `get_discounts::FindApplicableDiscountsInput`, which is a Vec of the following struct: +/// +/// pub struct FindApplicableDiscountsProductVariantInput { +/// pub product_variant_id: Uuid, +/// pub count: u64, +/// pub coupon_ids: HashSet, +/// } +/// +/// Describes product variant ids, the count of items planned to order and the coupons, which should be applied. +fn build_find_applicable_discounts_product_variant_input( + order_item_inputs_by_product_variant_ids: &HashMap, + product_variant_ids: &Vec, + counts_by_product_variant_ids: &HashMap, +) -> Result> { + let find_applicable_discounts_product_variant_input: Vec< + get_discounts::FindApplicableDiscountsProductVariantInput, + > = product_variant_ids + .iter() + .map(|id| { + let counts_error = build_hash_map_error(counts_by_product_variant_ids, *id); + let count = counts_by_product_variant_ids.get(id).ok_or(counts_error)?; + let order_item_error = + build_hash_map_error(order_item_inputs_by_product_variant_ids, *id); + let coupon_ids = order_item_inputs_by_product_variant_ids + .get(id) + .ok_or(order_item_error)? + .coupon_ids + .iter() + .cloned() + .collect(); + let find_applicable_discounts_product_variant_input = + get_discounts::FindApplicableDiscountsProductVariantInput { + product_variant_id: *id, + count: i64::try_from(*count)?, + coupon_ids, + }; + Ok::( + find_applicable_discounts_product_variant_input, + ) + }) + .collect::>>()?; + Ok(find_applicable_discounts_product_variant_input) +} + +/// Remaps the result type of the GraphQL `findApplicableDiscounts` query to the the according product variants. +fn remap_discounts_to_product_variants( + discounts_for_product_variants_response_data: Vec< + get_discounts::GetDiscountsFindApplicableDiscounts, + >, + product_variant_ids: &Vec, +) -> Result> { + let mut discounts_for_product_variants: HashMap< + Uuid, + get_discounts::GetDiscountsFindApplicableDiscounts, + > = discounts_for_product_variants_response_data + .into_iter() + .fold( + HashMap::new(), + |mut map: HashMap, + discount_for_product_variant: get_discounts::GetDiscountsFindApplicableDiscounts| { + map.insert( + discount_for_product_variant.product_variant_id, + discount_for_product_variant, + ); + map + }, + ); + product_variant_ids.iter().map(|id| { + let message = format!("Product variant of UUID: `{}` is not contained in the result which `findApplicableDiscounts` provides.", id); + let discounts = discounts_for_product_variants.remove(id).ok_or(Error::new(message))?; + Ok((*id, discounts)) + }).collect() +} + +/// Converts the GraphQL client library generated discounts to the internally used discounts, which are GraphQL `SimpleObject`. +/// +/// This enables the discounts to be retrivable from the GraphQL endpoints of this service. +fn convert_graphql_client_lib_discounts_to_simple_object_discounts( + graphql_client_lib_discounts: HashMap, +) -> HashMap> { + graphql_client_lib_discounts + .into_iter() + .map(|(id, discounts)| { + let discounts = discounts + .discounts + .into_iter() + .map( + |discount: get_discounts::GetDiscountsFindApplicableDiscountsDiscounts| { + Discount::from(discount) + }, + ) + .collect(); + (id, discounts) + }) + .collect() +} + +/// Calculates the total sum of the undiscounted order items. Does not include shipping costs. +/// +/// This defines the semantic of the total amount that is passed to the Discount service, for figuring out which Discounts apply. +/// Do not confuse with `calculate_compensatable_order_amount`, which is the total compensatable amount that the buyer needs to pay. +/// +/// Converts value to an `i64` as this is what the GraphQL client library expects. +fn calculate_order_amount( + pproduct_variant_versions_by_product_variant_ids: &HashMap, +) -> i64 { + let order_amount: u32 = pproduct_variant_versions_by_product_variant_ids + .iter() + .map(|(_, p)| p.price) + .sum(); + i64::from(order_amount) +} + +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "schemas_repo/shipment.graphql", + query_path = "queries/get_shipment_fees.graphql", + response_derives = "Debug" +)] +/// GraphQL query generated by client library. +struct GetShipmentFees; + +/// Queries shipment fees for product variant versions and counts. +async fn query_shipment_fees( + order_item_inputs_by_product_variant_ids: &HashMap, + product_variant_versions_by_product_variant_ids: &HashMap, + counts_by_product_variant_ids: &HashMap, +) -> Result { + let calculate_shipment_fees_input = build_calculate_shipment_fees_input( + product_variant_versions_by_product_variant_ids, + counts_by_product_variant_ids, + order_item_inputs_by_product_variant_ids, + )?; + let variables = get_shipment_fees::Variables { + calculate_shipment_fees_input, + }; + + let request_body = GetShipmentFees::build_query(variables); + let client = reqwest::Client::new(); + + let res = client + .post("http://localhost:3500/v1.0/invoke/shipment/method/graphql") + .json(&request_body) + .send() + .await?; + let response_body: Response = res.json().await?; + let message = "Response data of `query_shipment_fees` query is empty."; + let response_data: get_shipment_fees::ResponseData = + response_body.data.ok_or(Error::new(message))?; + let shipment_fees = u64::try_from(response_data.calculate_shipment_fees)?; + Ok(shipment_fees) +} + +/// Builds the `get_shipment_fees::CalculateShipmentFeesInput` by using product variant versions, counts and shipment methods. +fn build_calculate_shipment_fees_input( + product_variant_versions_by_product_variant_ids: &HashMap, + counts_by_product_variant_ids: &HashMap, + order_item_inputs_by_product_variant_ids: &HashMap, +) -> Result { + let items = + product_variant_versions_by_product_variant_ids + .iter() + .map(|(id, product_variant_version)| { + let count_error = build_hash_map_error(counts_by_product_variant_ids, *id); + let count = counts_by_product_variant_ids.get(id).ok_or(count_error)?; + let order_item_input_error = + build_hash_map_error(order_item_inputs_by_product_variant_ids, *id); + let shipment_method_id: Uuid = order_item_inputs_by_product_variant_ids + .get(id) + .ok_or(order_item_input_error)? + .shipment_method_id; + let product_variant_version_with_quantity_and_shipment_method_input = + get_shipment_fees::ProductVariantVersionWithQuantityAndShipmentMethodInput { + product_variant_version_id: product_variant_version._id, + quantity: i64::try_from(*count)?, + shipment_method_id, + }; + Ok(product_variant_version_with_quantity_and_shipment_method_input) + }) + .collect::, + >>()?; + let calculate_shipment_fees_input = get_shipment_fees::CalculateShipmentFeesInput { items }; + Ok(calculate_shipment_fees_input) +} + +/// Sends an `order/order/created` created event containing the order context. +async fn send_order_created_event(order: Order) -> Result<()> { + let client = reqwest::Client::new(); + let order_dto = OrderDTO::try_from(order)?; + client + .post("http://localhost:3500/v1.0/publish/pubsub/order/order/created") + .json(&order_dto) + .send() + .await?; + Ok(()) +} + +/// Checks if an address is registered under a specific user (MongoDB database populated with events). +/// +/// Used before creating orders. +async fn validate_user_address( + collection: &Collection, + id: Uuid, + user_id: Uuid, +) -> Result<()> { + match collection.find_one(doc! {"_id": user_id }, None).await { + Ok(maybe_object) => match maybe_object { + Some(_) => Ok(()), + None => { + let message = format!( + "User address with UUID: `{}` of user with UUID: `{}` not found.", + id, user_id + ); + Err(Error::new(message)) + } + }, + Err(_) => { + let message = format!( + "User address with UUID: `{}` of user with UUID: `{}` not found.", + id, user_id + ); + Err(Error::new(message)) + } + } +} + +/// Checks if a single object is in the system (MongoDB database populated with events). +/// +/// Used before creating orders. +pub async fn validate_object Deserialize<'a> + Unpin + Send + Sync>( + collection: &Collection, + id: Uuid, +) -> Result<()> { + query_object(&collection, id).await.map(|_| ()) +} + +/// Checks if all objects are in the system (MongoDB database populated with events). +/// +/// Used before creating orders. +async fn validate_objects Deserialize<'b> + Unpin + Send + Sync + PartialEq + Clone>( + collection: &Collection, + object_ids: Vec, +) -> Result<()> +where + Uuid: From, +{ + match collection + .find(doc! {"_id": { "$in": &object_ids } }, None) + .await + { + Ok(cursor) => { + let objects: Vec = cursor.try_collect().await?; + let ids: Vec = objects.iter().map(|o| Uuid::from(o.clone())).collect(); + object_ids + .iter() + .fold(Ok(()), |o, id| match ids.contains(id) { + true => o.and(Ok(())), + false => { + let message = format!( + "{} with UUID: `{}` is not present in the system.", + type_name::(), + id + ); + Err(Error::new(message)) + } + }) + } + Err(_) => { + let message = format!( + "{} with specified UUIDs are not present in the system.", + type_name::() + ); + Err(Error::new(message)) + } + } +} + +/// Returns an error of a HashMap retrieval. +/// +/// Constructs error message that describes a failed retrieval of an item `V` by product variant id. +fn build_hash_map_error(_hash_map: &HashMap, id: Uuid) -> Error { + let message = format!( + "`{}` for product variant of UUID: `{}` is not present in `{}`. ", + type_name::(), + id, + type_name::>() + ); + Error::new(message) +} diff --git a/src/mutation_input_structs.rs b/src/mutation_input_structs.rs new file mode 100644 index 0000000..5c83947 --- /dev/null +++ b/src/mutation_input_structs.rs @@ -0,0 +1,43 @@ +use async_graphql::{InputObject, SimpleObject}; +use bson::Uuid; +use std::{ + cmp::Ordering, + collections::{BTreeSet, HashSet}, +}; + +#[derive(SimpleObject, InputObject)] +pub struct CreateOrderInput { + /// UUID of user owning the order. + pub user_id: Uuid, + /// OrderItems of order. + pub order_item_inputs: BTreeSet, + /// UUID of address to where the order should be shipped to. + pub shipment_address_id: Uuid, + /// UUID of address of invoice. + pub invoice_address_id: Uuid, + /// UUID of payment information that the order should be processed with. + pub payment_information_id: Uuid, +} + +#[derive(SimpleObject, InputObject, PartialEq, Eq, Clone)] +pub struct OrderItemInput { + /// UUID of shopping cart item associated with order item. + pub shopping_cart_item_id: Uuid, + /// UUID of shipment method to use with order item. + pub shipment_method_id: Uuid, + /// UUIDs of coupons to use with order item. + pub coupon_ids: HashSet, +} + +impl PartialOrd for OrderItemInput { + fn partial_cmp(&self, other: &Self) -> Option { + self.shopping_cart_item_id + .partial_cmp(&other.shopping_cart_item_id) + } +} + +impl Ord for OrderItemInput { + fn cmp(&self, other: &Self) -> Ordering { + self.shopping_cart_item_id.cmp(&other.shopping_cart_item_id) + } +} diff --git a/src/order.rs b/src/order.rs new file mode 100644 index 0000000..fc2bbf3 --- /dev/null +++ b/src/order.rs @@ -0,0 +1,193 @@ +use std::cmp::Ordering; + +use async_graphql::{ComplexObject, Enum, Error, Result, SimpleObject}; +use bson::Uuid; +use bson::{datetime::DateTime, Bson}; +use serde::{Deserialize, Serialize}; + +use crate::foreign_types::UserAddress; +use crate::order_datatypes::OrderDirection; +use crate::order_item::OrderItemDTO; +use crate::{ + order_datatypes::CommonOrderInput, order_item::OrderItem, + order_item_connection::OrderItemConnection, user::User, +}; + +/// The Order of a user. +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, SimpleObject)] +#[graphql(complex)] +pub struct Order { + /// Order UUID. + pub _id: Uuid, + /// User. + pub user: User, + /// Timestamp when Order was created. + pub created_at: DateTime, + /// The status of the Order. + pub order_status: OrderStatus, + /// Timestamp of Order placement. `None` until Order is placed. + pub placed_at: Option, + /// The rejection reason if status of the Order is `OrderStatus::Rejected`. + pub rejection_reason: Option, + /// The internal vector consisting of OrderItems. + #[graphql(skip)] + pub internal_order_items: Vec, + /// Address to where the order should be shipped to. + #[graphql(skip)] + pub shipment_address: UserAddress, + /// Address of invoice. + pub invoice_address: UserAddress, + /// Total compensatable amount of order. + pub compensatable_order_amount: u64, + /// UUID of payment information that the order should be processed with. + pub payment_information_id: Uuid, +} + +#[ComplexObject] +impl Order { + /// Retrieves order items. + async fn order_items( + &self, + #[graphql(desc = "Describes that the `first` N order items should be retrieved.")] + first: Option, + #[graphql(desc = "Describes how many order items should be skipped at the beginning.")] + skip: Option, + #[graphql(desc = "Specifies the order in which order items are retrieved.")] + order_by: Option, + ) -> Result { + let mut order_items: Vec = + self.internal_order_items.clone().into_iter().collect(); + sort_order_items(&mut order_items, order_by); + let total_count = order_items.len(); + let definitely_skip = skip.unwrap_or(0); + let definitely_first = first.unwrap_or(usize::MAX); + let order_items_part: Vec = order_items + .into_iter() + .skip(definitely_skip) + .take(definitely_first) + .collect(); + let has_next_page = total_count > order_items_part.len() + definitely_skip; + Ok(OrderItemConnection { + nodes: order_items_part, + has_next_page, + total_count: total_count as u64, + }) + } +} + +/// Describes if Order is placed, or yet pending. An Order can be rejected during its lifetime. +#[derive(Debug, Enum, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum OrderStatus { + /// Order is saved a a template, this status can only last for max. 1 hour. + Pending, + /// Order is placed, which means SAGA for payment, fullfill and other validity checks need to be triggered. + Placed, + /// Something went wrong with the order and it was compensated in all relevant serivces. + Rejected, +} + +impl OrderStatus { + pub fn as_str(&self) -> &'static str { + match self { + OrderStatus::Pending => "PENDING", + OrderStatus::Placed => "PLACED", + OrderStatus::Rejected => "REJECTED", + } + } +} + +impl From for Bson { + fn from(value: OrderStatus) -> Self { + Bson::from(value.as_str()) + } +} + +/// Describes the reason why an Order was rejected, in case of rejection: `OrderStatus::Rejected`. +#[derive(Debug, Enum, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum RejectionReason { + /// The order was rejected due to its invalid content. + InvalidOrderData, + /// The inventory service was not able to reserve inventory items according to the order. + InventoryReservationFailed, +} + +impl From for Uuid { + fn from(value: Order) -> Self { + value._id + } +} + +/// Sorts vector of order items according to BaseOrder. +/// +/// * `order_items` - Vector of order items to sort. +/// * `order_by` - Specifies order of sorted result. +fn sort_order_items(order_items: &mut Vec, order_by: Option) { + let comparator: fn(&OrderItem, &OrderItem) -> bool = + match order_by.unwrap_or_default().direction.unwrap_or_default() { + OrderDirection::Asc => |x, y| x < y, + OrderDirection::Desc => |x, y| x > y, + }; + order_items.sort_by(|x, y| match comparator(x, y) { + true => Ordering::Less, + false => Ordering::Greater, + }); +} + +/// DTO of an order of a user. +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OrderDTO { + /// Order UUID. + pub id: Uuid, + /// UUID of user connected with Order. + pub user_id: Uuid, + /// Timestamp when Order was created. + pub created_at: chrono::DateTime, + /// The status of the Order. + pub order_status: OrderStatus, + /// Timestamp of Order placement. `None` until Order is placed. + pub placed_at: chrono::DateTime, + /// The rejection reason if status of the Order is `OrderStatus::Rejected`. + pub rejection_reason: Option, + /// OrderItems associated with the order. + pub order_items: Vec, + /// UUID of address to where the order should be shipped to. + pub shipment_address_id: Uuid, + /// UUID of address of invoice. + pub invoice_address_id: Uuid, + /// Total compensatable amount of order. + pub compensatable_order_amount: u64, + /// UUID of payment information that the order should be processed with. + pub payment_information_id: Uuid, +} + +impl TryFrom for OrderDTO { + type Error = Error; + + fn try_from(value: Order) -> Result { + let order_item_dtos = value + .internal_order_items + .iter() + .map(|o| OrderItemDTO::from(o.clone())) + .collect(); + let message = + format!("OrderDTO cannot be created, `placed_at` of the given Order is `None`"); + let placed_at = value.placed_at.ok_or(Error::new(message))?.to_chrono(); + let order_dto = Self { + id: value._id, + user_id: value.user._id, + created_at: value.created_at.to_chrono(), + order_status: value.order_status, + placed_at, + rejection_reason: value.rejection_reason, + order_items: order_item_dtos, + shipment_address_id: value.shipment_address._id, + invoice_address_id: value.invoice_address._id, + compensatable_order_amount: value.compensatable_order_amount, + payment_information_id: value.payment_information_id, + }; + Ok(order_dto) + } +} diff --git a/src/order_compensation.rs b/src/order_compensation.rs new file mode 100644 index 0000000..e0a204a --- /dev/null +++ b/src/order_compensation.rs @@ -0,0 +1,125 @@ +use async_graphql::{Error, Result}; +use bson::{doc, DateTime, Uuid}; +use futures::TryStreamExt; +use mongodb::Collection; +use serde::{Deserialize, Serialize}; + +use crate::{ + http_event_service::ShipmentFailedEventData, mutation::validate_object, order::Order, + query::query_object, +}; + +/// Models an order compensation that is sent as an event and logged in MongoDB. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct OrderCompensation { + /// OrderCompensation UUID. + pub _id: Uuid, + /// UUID of the order. + pub order_id: Uuid, + /// UUIDs of the order items of shipment. + pub order_item_ids: Vec, + /// Timestamp when compensation was triggered. + pub triggered_at: DateTime, + /// Amount of order compensation. + pub amount_to_compensate: u64, +} + +/// DTO that models an order compensation that is sent as an event and logged in MongoDB. +#[derive(Debug, Serialize)] +pub struct OrderCompensationDTO { + /// OrderCompensation UUID. + pub id: Uuid, + /// Amount of order compensation + pub amount_to_compensate: u64, +} + +impl From for OrderCompensationDTO { + fn from(value: OrderCompensation) -> Self { + Self { + id: value._id, + amount_to_compensate: value.amount_to_compensate, + } + } +} + +/// Responsible for compensating a shipment based on a failed shipment event. Saves compensation in MongoDB. +pub async fn compensate_order( + order_collection: &Collection, + order_compensation_collection: &Collection, + data: ShipmentFailedEventData, +) -> Result<()> { + validate_object(&order_collection, data.order_id).await?; + verify_items_uncompensated(&order_compensation_collection, &data.order_item_ids).await?; + let amount_to_compensate = calculate_amount_to_compensate(&order_collection, &data).await?; + let order_compensation = OrderCompensation { + _id: Uuid::new(), + order_id: data.order_id, + order_item_ids: data.order_item_ids, + triggered_at: DateTime::now(), + amount_to_compensate, + }; + insert_order_compensation_in_mongodb(&order_compensation_collection, &order_compensation) + .await?; + send_order_compensation_event(order_compensation).await +} + +/// Calculates the amount that the compensation event should compensate. Based on the failed shipment event. +async fn calculate_amount_to_compensate( + order_collection: &Collection, + data: &ShipmentFailedEventData, +) -> Result { + let order = query_object(&order_collection, data.order_id).await?; + let compensatable_amounts: Vec = order + .internal_order_items + .iter() + .filter(|i| data.order_item_ids.contains(&i._id)) + .map(|i| i.compensatable_amount) + .collect(); + let amount_to_compensate = compensatable_amounts.iter().sum(); + Ok(amount_to_compensate) +} + +/// Verifies that all of the items are uncompensated, otherwise returns an Err. +async fn verify_items_uncompensated( + order_collection: &Collection, + order_item_ids: &Vec, +) -> Result<()> { + let query = doc! {"order_item_ids": {"$not": {"$elemMatch": {"$in": order_item_ids}}}}; + let message = format!( + "Order items of UUIDs: `{:?}` could not be verfied.", + order_item_ids + ); + match order_collection.find(query, None).await { + Ok(cursor) => { + let objects: Vec = cursor.try_collect().await?; + match objects.len() { + 0 => Ok(()), + _ => Err(Error::new(message)), + } + } + Err(_) => Err(Error::new(message)), + } +} + +/// Inserts OrderCompensation in MongoDB. +async fn insert_order_compensation_in_mongodb( + order_collection: &Collection, + order_compensation: &OrderCompensation, +) -> Result<()> { + match order_collection.insert_one(order_compensation, None).await { + Ok(_) => Ok(()), + Err(_) => Err(Error::new("Adding order compensation failed in MongoDB.")), + } +} + +/// Sends an `order/order/compensate` created event containing the amount to compensate. +async fn send_order_compensation_event(order_compensation: OrderCompensation) -> Result<()> { + let client = reqwest::Client::new(); + let order_compensation_dto = OrderCompensationDTO::from(order_compensation); + client + .post("http://localhost:3500/v1.0/publish/pubsub/order/order-compensation/created") + .json(&order_compensation_dto) + .send() + .await?; + Ok(()) +} diff --git a/src/order_connection.rs b/src/order_connection.rs new file mode 100644 index 0000000..47867fd --- /dev/null +++ b/src/order_connection.rs @@ -0,0 +1,28 @@ +use async_graphql::SimpleObject; + +use crate::{base_connection::BaseConnection, order::Order}; + +/// A connection of Orders. +#[derive(SimpleObject)] +#[graphql(shareable)] +pub struct OrderConnection { + /// The resulting entities. + pub nodes: Vec, + /// Whether this connection has a next page. + pub has_next_page: bool, + /// The total amount of items in this connection. + pub total_count: u64, +} + +/// Implementation of conversion from BaseConnection to OrderConnection. +/// +/// Prevents GraphQL naming conflicts. +impl From> for OrderConnection { + fn from(value: BaseConnection) -> Self { + Self { + nodes: value.nodes, + has_next_page: value.has_next_page, + total_count: value.total_count, + } + } +} diff --git a/src/order_datatypes.rs b/src/order_datatypes.rs new file mode 100644 index 0000000..dbe3e11 --- /dev/null +++ b/src/order_datatypes.rs @@ -0,0 +1,118 @@ +use async_graphql::{Enum, InputObject, SimpleObject}; + +/// GraphQL order direction. +#[derive(Enum, Copy, Clone, Eq, PartialEq)] +pub enum OrderDirection { + /// Ascending order direction. + Asc, + /// Descending order direction. + Desc, +} + +impl Default for OrderDirection { + fn default() -> Self { + Self::Asc + } +} + +/// Implements conversion to i32 for MongoDB document sorting. +impl From for i32 { + fn from(value: OrderDirection) -> Self { + match value { + OrderDirection::Asc => 1, + OrderDirection::Desc => -1, + } + } +} + +/// Describes the fields that a order can be ordered by. +#[derive(Enum, Copy, Clone, Eq, PartialEq)] +pub enum OrderOrderField { + /// Orders by "id". + Id, + /// Orders by "user_id". + UserId, + /// Orders by "name". + Name, + /// Orders by "created_at". + CreatedAt, + /// Orders by "last_updated_at". + LastUpdatedAt, +} + +impl OrderOrderField { + pub fn as_str(&self) -> &'static str { + match self { + OrderOrderField::Id => "_id", + OrderOrderField::UserId => "user._id", + OrderOrderField::Name => "name", + OrderOrderField::CreatedAt => "created_at", + OrderOrderField::LastUpdatedAt => "last_updated_at", + } + } +} + +impl Default for OrderOrderField { + fn default() -> Self { + Self::Id + } +} + +/// Specifies the order of orders. +#[derive(SimpleObject, InputObject)] +pub struct OrderOrderInput { + /// Order direction of orders. + pub direction: Option, + /// Field that orders should be ordered by. + pub field: Option, +} + +impl Default for OrderOrderInput { + fn default() -> Self { + Self { + direction: Some(Default::default()), + field: Some(Default::default()), + } + } +} + +/// Describes the fields that a foreign types can be ordered by. +/// +/// Only the Id valid at the moment. +#[derive(Enum, Copy, Clone, Eq, PartialEq)] +pub enum CommonOrderField { + /// Orders by "id". + Id, +} + +impl CommonOrderField { + pub fn as_str(&self) -> &'static str { + match self { + CommonOrderField::Id => "_id", + } + } +} + +impl Default for CommonOrderField { + fn default() -> Self { + Self::Id + } +} + +/// Specifies the order of foreign types. +#[derive(SimpleObject, InputObject)] +pub struct CommonOrderInput { + /// Order direction of orders. + pub direction: Option, + /// Field that orders should be ordered by. + pub field: Option, +} + +impl Default for CommonOrderInput { + fn default() -> Self { + Self { + direction: Some(Default::default()), + field: Some(Default::default()), + } + } +} diff --git a/src/order_item.rs b/src/order_item.rs new file mode 100644 index 0000000..7d38967 --- /dev/null +++ b/src/order_item.rs @@ -0,0 +1,202 @@ +use std::{cmp::Ordering, collections::BTreeSet}; + +use async_graphql::{ComplexObject, Result, SimpleObject}; +use bson::{DateTime, Uuid}; +use serde::{Deserialize, Serialize}; + +use crate::{ + discount_connection::DiscountConnection, + foreign_types::{ + Discount, ProductVariant, ProductVariantVersion, ShipmentMethod, ShoppingCartItem, + TaxRateVersion, + }, + mutation_input_structs::OrderItemInput, + order_datatypes::{CommonOrderInput, OrderDirection}, +}; + +/// Describes an OrderItem of an Order. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, SimpleObject)] +#[graphql(complex)] +pub struct OrderItem { + /// OrderItem UUID. + pub _id: Uuid, + /// Timestamp when OrderItem was created. + pub created_at: DateTime, + /// Product variant associated with OrderItem. + pub product_variant: ProductVariant, + /// Product variant version associated with OrderItem. + pub product_variant_version: ProductVariantVersion, + /// Tax rate version associated with OrderItem. + pub tax_rate_version: TaxRateVersion, + /// Shopping cart item associated with OrderItem. + pub shopping_cart_item: ShoppingCartItem, + /// Specifies the quantity of the OrderItem. + pub count: u64, + /// Total cost of product item, which can also be refunded. + pub compensatable_amount: u64, + /// Shipment method of order item. + pub shipment_method: ShipmentMethod, + /// The internal vector consisting of Discounts. + #[graphql(skip)] + pub internal_discounts: BTreeSet, +} + +impl OrderItem { + /// Constructor for OrderItems. + /// + /// Queries ProductVariantVersion from MongoDB. + pub fn new( + order_item_input: &OrderItemInput, + product_variant: &ProductVariant, + product_variant_version: &ProductVariantVersion, + tax_rate_version: &TaxRateVersion, + count: u64, + internal_discounts: &BTreeSet, + current_timestamp: DateTime, + ) -> Self { + let compensatable_amount = + calculate_compensatable_amount(product_variant_version, &internal_discounts); + let shopping_cart_item = ShoppingCartItem { + _id: order_item_input.shopping_cart_item_id, + }; + let shipment_method = ShipmentMethod { + _id: order_item_input.shipment_method_id, + }; + Self { + _id: Uuid::new(), + created_at: current_timestamp, + product_variant: product_variant.clone(), + product_variant_version: product_variant_version.clone(), + tax_rate_version: tax_rate_version.clone(), + shopping_cart_item, + count, + compensatable_amount, + shipment_method, + internal_discounts: internal_discounts.clone(), + } + } +} + +#[ComplexObject] +impl OrderItem { + /// Retrieves discounts. + async fn discounts( + &self, + #[graphql(desc = "Describes that the `first` N discounts should be retrieved.")] + first: Option, + #[graphql( + desc = "Describes how many discounts should be skipped at the beginning." + )] + skip: Option, + #[graphql(desc = "Specifies the order in which discounts are retrieved.")] order_by: Option< + CommonOrderInput, + >, + ) -> Result { + let mut discounts: Vec = self.internal_discounts.clone().into_iter().collect(); + sort_discounts(&mut discounts, order_by); + let total_count = discounts.len(); + let definitely_skip = skip.unwrap_or(0); + let definitely_first = first.unwrap_or(usize::MAX); + let discounts_part: Vec = discounts + .into_iter() + .skip(definitely_skip) + .take(definitely_first) + .collect(); + let has_next_page = total_count > discounts_part.len() + definitely_skip; + Ok(DiscountConnection { + nodes: discounts_part, + has_next_page, + total_count: total_count as u64, + }) + } +} + +impl PartialOrd for OrderItem { + fn partial_cmp(&self, other: &Self) -> Option { + self._id.partial_cmp(&other._id) + } +} + +impl Ord for OrderItem { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self._id.cmp(&other._id) + } +} + +/// Sorts vector of discounts according to BaseOrder. +/// +/// * `discounts` - Vector of discounts to sort. +/// * `order_by` - Specifies order of sorted result. +fn sort_discounts(discounts: &mut Vec, order_by: Option) { + let comparator: fn(&Discount, &Discount) -> bool = + match order_by.unwrap_or_default().direction.unwrap_or_default() { + OrderDirection::Asc => |x, y| x < y, + OrderDirection::Desc => |x, y| x > y, + }; + discounts.sort_by(|x, y| match comparator(x, y) { + true => Ordering::Less, + false => Ordering::Greater, + }); +} + +/// Applies fees and discounts to calculate the compensatable amount of an OrderItem. +fn calculate_compensatable_amount( + product_variant_version: &ProductVariantVersion, + internal_discounts: &BTreeSet, +) -> u64 { + let undiscounted_price = product_variant_version.price as f64; + let discounted_price = internal_discounts + .iter() + .fold(undiscounted_price, |prev_price, discount| { + prev_price * discount.discount + }); + let total_price = discounted_price as u64; + total_price +} + +/// Describes DTO of an OrderItem of an Order. +/// +/// `product_item` is set to None as long as `OrderStatus::Pending`. +/// Must contain a ProductItem when `OrderStatus::Placed` or `OrderStatus::Rejected`. +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OrderItemDTO { + /// OrderItem UUID. + pub id: Uuid, + /// Timestamp when OrderItem was created. + pub created_at: chrono::DateTime, + /// UUID of product variant associated with OrderItem. + pub product_variant_id: Uuid, + /// UUID of product variant version associated with OrderItem. + pub product_variant_version_id: Uuid, + /// UUID of tax rate version associated with OrderItem. + pub tax_rate_version_id: Uuid, + /// UUID of shopping cart item associated with OrderItem. + pub shopping_cart_item_id: Uuid, + /// Specifies the quantity of the OrderItem. + pub count: u64, + /// Total cost of product item, which can also be refunded. + pub compensatable_amount: u64, + /// UUID of shipment method of order item. + pub shipment_method_id: Uuid, + /// UUIDs of discounts applied to order item. + pub discount_ids: Vec, +} + +impl From for OrderItemDTO { + fn from(value: OrderItem) -> Self { + let discount_ids = value.internal_discounts.iter().map(|d| d._id).collect(); + Self { + id: value._id, + created_at: value.created_at.to_chrono(), + product_variant_id: value.product_variant._id, + product_variant_version_id: value.product_variant_version._id, + tax_rate_version_id: value.tax_rate_version._id, + shopping_cart_item_id: value.shopping_cart_item._id, + count: value.count, + compensatable_amount: value.compensatable_amount, + shipment_method_id: value.shipment_method._id, + discount_ids, + } + } +} diff --git a/src/order_item_connection.rs b/src/order_item_connection.rs new file mode 100644 index 0000000..54e70c1 --- /dev/null +++ b/src/order_item_connection.rs @@ -0,0 +1,28 @@ +use async_graphql::SimpleObject; + +use crate::{base_connection::BaseConnection, order_item::OrderItem}; + +/// A connection of OrderItems. +#[derive(SimpleObject)] +#[graphql(shareable)] +pub struct OrderItemConnection { + /// The resulting entities. + pub nodes: Vec, + /// Whether this connection has a next page. + pub has_next_page: bool, + /// The total amount of items in this connection. + pub total_count: u64, +} + +/// Implementation of conversion from BaseConnection to OrderItemConnection. +/// +/// Prevents GraphQL naming conflicts. +impl From> for OrderItemConnection { + fn from(value: BaseConnection) -> Self { + Self { + nodes: value.nodes, + has_next_page: value.has_next_page, + total_count: value.total_count, + } + } +} diff --git a/src/product_variant_version_connection.rs b/src/product_variant_version_connection.rs new file mode 100644 index 0000000..0b56d4b --- /dev/null +++ b/src/product_variant_version_connection.rs @@ -0,0 +1,28 @@ +use async_graphql::SimpleObject; + +use crate::{base_connection::BaseConnection, foreign_types::ProductVariantVersion}; + +/// A connection of ProductVariantVersions. +#[derive(SimpleObject)] +#[graphql(shareable)] +pub struct ProductVariantVersionConnection { + /// The resulting entities. + pub nodes: Vec, + /// Whether this connection has a next page. + pub has_next_page: bool, + /// The total amount of items in this connection. + pub total_count: u64, +} + +/// Implementation of conversion from BaseConnection to ProductVariantVersionConnection. +/// +/// Prevents GraphQL naming conflicts. +impl From> for ProductVariantVersionConnection { + fn from(value: BaseConnection) -> Self { + Self { + nodes: value.nodes, + has_next_page: value.has_next_page, + total_count: value.total_count, + } + } +} diff --git a/src/query.rs b/src/query.rs new file mode 100644 index 0000000..5a46fa0 --- /dev/null +++ b/src/query.rs @@ -0,0 +1,180 @@ +use std::{any::type_name, collections::HashMap}; + +use crate::{authentication::authenticate_user, order_item::OrderItem, user::User, Order}; +use async_graphql::{Context, Error, Object, Result}; + +use bson::Uuid; +use futures::TryStreamExt; +use mongodb::{bson::doc, Collection, Database}; +use serde::Deserialize; + +/// Describes GraphQL order queries. +pub struct Query; + +#[Object] +impl Query { + /// Entity resolver for user of specific id. + #[graphql(entity)] + async fn user_entity_resolver<'a>( + &self, + ctx: &Context<'a>, + #[graphql(desc = "UUID of user to retrieve.")] id: Uuid, + ) -> Result { + let db_client = ctx.data::()?; + let collection: Collection = db_client.collection::("users"); + query_object(&collection, id).await + } + + /// Retrieves order of specific id. + async fn order<'a>( + &self, + ctx: &Context<'a>, + #[graphql(desc = "UUID of order to retrieve.")] id: Uuid, + ) -> Result { + let db_client = ctx.data::()?; + let collection: Collection = db_client.collection::("orders"); + let order = query_object(&collection, id).await?; + authenticate_user(&ctx, order.user._id)?; + Ok(order) + } + + /// Entity resolver for order of specific id. + #[graphql(entity)] + async fn order_entity_resolver<'a>( + &self, + ctx: &Context<'a>, + #[graphql(key, desc = "UUID of order to retrieve.")] id: Uuid, + ) -> Result { + let db_client = ctx.data::()?; + let collection: Collection = db_client.collection::("orders"); + let order = query_object(&collection, id).await?; + Ok(order) + } + + /// Retrieves order_item of specific id. + async fn order_item<'a>( + &self, + ctx: &Context<'a>, + #[graphql(desc = "UUID of order_item to retrieve.")] id: Uuid, + ) -> Result { + let db_client = ctx.data::()?; + let order_collection: Collection = db_client.collection::("orders"); + let order_item_collection: Collection = + db_client.collection::("order_items"); + let order_item = query_object(&order_item_collection, id).await?; + let user = query_user_from_order_item_id(&order_collection, id).await?; + authenticate_user(&ctx, user._id)?; + Ok(order_item) + } + + /// Entity resolver for order_item of specific id. + #[graphql(entity)] + async fn order_item_entity_resolver<'a>( + &self, + ctx: &Context<'a>, + #[graphql(key, desc = "UUID of order_item to retrieve.")] id: Uuid, + ) -> Result { + let db_client = ctx.data::()?; + let collection: Collection = db_client.collection::("order_items"); + let order_item = query_object(&collection, id).await?; + Ok(order_item) + } +} + +/// Shared function to query a order from a MongoDB collection of orders +/// +/// * `connection` - MongoDB database connection. +/// * `id` - UUID of order. +pub async fn query_order(collection: &Collection, id: Uuid) -> Result { + match collection.find_one(doc! {"_id": id }, None).await { + Ok(maybe_order) => match maybe_order { + Some(order) => Ok(order), + None => { + let message = format!("Order with UUID: `{}` not found.", id); + Err(Error::new(message)) + } + }, + Err(_) => { + let message = format!("Order with UUID: `{}` not found.", id); + Err(Error::new(message)) + } + } +} + +async fn query_user_from_order_item_id(collection: &Collection, id: Uuid) -> Result { + match collection + .find_one(doc! {"internal_order_items._id": id }, None) + .await + { + Ok(maybe_order) => match maybe_order { + Some(order) => Ok(order.user), + None => { + let message = format!("OrderItem with UUID: `{}` not found.", id); + Err(Error::new(message)) + } + }, + Err(_) => { + let message = format!("OrderItem with UUID: `{}` not found.", id); + Err(Error::new(message)) + } + } +} + +/// Shared function to query an object: T from a MongoDB collection of object: T. +/// +/// * `connection` - MongoDB database connection. +/// * `id` - UUID of object. +pub async fn query_object Deserialize<'a> + Unpin + Send + Sync>( + collection: &Collection, + id: Uuid, +) -> Result { + match collection.find_one(doc! {"_id": id }, None).await { + Ok(maybe_object) => match maybe_object { + Some(object) => Ok(object), + None => { + let message = format!("{} with UUID: `{}` not found.", type_name::(), id); + Err(Error::new(message)) + } + }, + Err(_) => { + let message = format!("{} with UUID: `{}` not found.", type_name::(), id); + Err(Error::new(message)) + } + } +} + +/// Shared function to query objects: T from a MongoDB collection of object: T. +/// +/// * `connection` - MongoDB database connection. +/// * `ids` - UUIDs of objects. +pub async fn query_objects Deserialize<'a> + Unpin + Send + Sync + Clone>( + collection: &Collection, + object_ids: &Vec, +) -> Result> +where + Uuid: From, +{ + match collection + .find(doc! {"_id": { "$in": &object_ids } }, None) + .await + { + Ok(cursor) => { + let objects: HashMap = cursor + .try_fold(HashMap::new(), |mut map, result| async move { + let id = Uuid::from(result.clone()); + map.insert(id, result); + Ok(map) + }) + .await?; + Ok(objects) + } + Err(_) => { + let message = format!( + "{} with UUIDs: `{:?}` not found.", + type_name::(), + object_ids + ); + Err(Error::new(message)) + } + } +} diff --git a/src/user.rs b/src/user.rs new file mode 100644 index 0000000..33cd5c3 --- /dev/null +++ b/src/user.rs @@ -0,0 +1,75 @@ +use async_graphql::{ComplexObject, Context, Error, Result, SimpleObject}; +use bson::{doc, Document, Uuid}; +use mongodb::{options::FindOptions, Collection, Database}; +use mongodb_cursor_pagination::{error::CursorError, FindResult, PaginatedCursor}; +use serde::{Deserialize, Serialize}; + +use crate::{ + authentication::authenticate_user, + base_connection::{BaseConnection, FindResultWrapper}, + order::Order, + order_connection::OrderConnection, + order_datatypes::OrderOrderInput, +}; + +/// Type of a user owning orders. +#[derive(Debug, Serialize, Deserialize, Hash, Eq, PartialEq, Clone, SimpleObject)] +#[graphql(complex)] +pub struct User { + /// UUID of the user. + pub _id: Uuid, + /// UUIDs of the users addresses. + #[graphql(skip)] + pub user_address_ids: Vec, +} + +#[ComplexObject] +impl User { + /// Retrieves orders of user. + async fn orders<'a>( + &self, + ctx: &Context<'a>, + #[graphql(desc = "Describes that the `first` N orders should be retrieved.")] first: Option< + u32, + >, + #[graphql(desc = "Describes how many orders should be skipped at the beginning.")] + skip: Option, + #[graphql(desc = "Specifies the order in which orders are retrieved.")] order_by: Option< + OrderOrderInput, + >, + ) -> Result { + authenticate_user(&ctx, self._id)?; + let db_client = ctx.data::()?; + let collection: Collection = db_client.collection::("orders"); + let order_order = order_by.unwrap_or_default(); + let sorting_doc = doc! {order_order.field.unwrap_or_default().as_str(): i32::from(order_order.direction.unwrap_or_default())}; + let find_options = FindOptions::builder() + .skip(skip) + .limit(first.map(|v| i64::from(v))) + .sort(sorting_doc) + .build(); + let document_collection = collection.clone_with_type::(); + let filter = doc! {"user._id": self._id}; + let maybe_find_results: Result, CursorError> = + PaginatedCursor::new(Some(find_options.clone()), None, None) + .find(&document_collection, Some(&filter)) + .await; + match maybe_find_results { + Ok(find_results) => { + let find_result_wrapper = FindResultWrapper(find_results); + let connection = Into::>::into(find_result_wrapper); + Ok(Into::::into(connection)) + } + Err(_) => return Err(Error::new("Retrieving orders failed in MongoDB.")), + } + } +} + +impl From for User { + fn from(value: Uuid) -> Self { + User { + _id: value, + user_address_ids: vec![], + } + } +}