diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index eb5c66cf..a8861610 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -77,12 +77,10 @@ jobs: # print env vars for debugging cat "$GITHUB_ENV" - - name: Build and push runtime image uses: docker/build-push-action@v4 with: - # for linux/s390x, maven errors due to missing io.grpc:protoc-gen-grpc-java:exe:linux-s390_64:1.51.1 - platforms: linux/amd64,linux/arm64/v8,linux/ppc64le + platforms: linux/amd64,linux/arm64/v8,linux/ppc64le,linux/s390x target: runtime push: ${{ github.event_name == 'push' }} tags: ${{ env.IMAGE_NAME }}:${{ env.VERSION }} diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml new file mode 100644 index 00000000..80ac6c11 --- /dev/null +++ b/.github/workflows/codeql.yml @@ -0,0 +1,87 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: "CodeQL" + +on: + push: + branches: ["main"] + pull_request: + # The branches below must be a subset of the branches above + branches: ["main"] + schedule: + - cron: '45 8 * * *' + +jobs: + analyze: + name: Analyze + # Runner size impacts CodeQL analysis time. To learn more, please see: + # - https://gh.io/recommended-hardware-resources-for-running-codeql + # - https://gh.io/supported-runners-and-hardware-resources + # - https://gh.io/using-larger-runners + # Consider using larger runners for possible analysis time improvements. + runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }} + timeout-minutes: ${{ (matrix.language == 'swift' && 120) || 360 }} + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + language: ["java-kotlin", "python"] + # CodeQL supports [ 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'swift' ] + # Use only 'java-kotlin' to analyze code written in Java, Kotlin or both + # Use only 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both + # Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set up Java 17 + uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + + # For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs + # queries: security-extended,security-and-quality + + # Autobuild attempts to build any compiled languages (C/C++, C#, Go, Java, or Swift). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v2 + + # ℹī¸ Command-line programs to run using the OS shell. + # 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun + + # If the Autobuild fails above, remove it and uncomment the following three lines. + # modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance. + + # - run: | + # echo "Run, Build Application using script" + # ./location_of_script_within_repo/buildscript.sh + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 + with: + category: "/language:${{matrix.language}}" diff --git a/.github/workflows/create-release-tag.yml b/.github/workflows/create-release-tag.yml new file mode 100644 index 00000000..b7dbfb6e --- /dev/null +++ b/.github/workflows/create-release-tag.yml @@ -0,0 +1,82 @@ +name: Create Tag and Release changelog superrelease noparam + +on: + workflow_dispatch: + inputs: + tag_name: + description: 'Tag name for the new release' + required: true + +permissions: + contents: write + packages: write + pull-requests: write + +jobs: + fetch-tag: + runs-on: ubuntu-latest + outputs: + old_tag: ${{ steps.get_tag.outputs.old_tag_name }} + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + ref: ${{ github.ref }} + fetch-depth: 0 + + - name: Get latest tag + id: get_tag + run: | + echo "old_tag_name=$(git ls-remote --tags origin | awk -F'/' '{print $3}' | grep -v '{}' | sort -V | tail -n1)" >> $GITHUB_OUTPUT + + - name: print tag + id: print_tag + run: | + echo "Old Tag=${{ steps.get_tag.outputs.old_tag_name }}" + echo "NEW_TAG=${{ github.event.inputs.tag_name }}" >> $GITHUB_ENV + echo "$(basename ${{ github.ref }})" + + - name: Check if tag exists + id: check_tag + run: | + import sys + import subprocess + tag_name = "${{ github.event.inputs.tag_name }}" + command = ['git', 'tag', '-l', tag_name] + output = subprocess.check_output(command, stderr=subprocess.STDOUT) + if output.decode() != "": + print(f"Error: Tag '{tag_name}' already exists.", file=sys.stderr) + sys.exit(1) + else: + print(f"Tag '{tag_name}' does not exists.") + shell: python + continue-on-error: false + + - name: Create Tag + id: create_tag + run: | + git config --global user.email "github-actions@github.com" + git config --global user.name "GitHub Actions" + git tag -a ${{ github.event.inputs.tag_name }} -m "Prepare for ODH release ${{ github.event.inputs.tag_name }}" + git push origin ${{ github.event.inputs.tag_name }} + + changelog: + name: Changelog + needs: fetch-tag + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + ref: ${{ github.ref }} + + - name: Create Release + uses: softprops/action-gh-release@v2 + with: + token: ${{ github.token }} + tag_name: ${{ github.event.inputs.tag_name }} + prerelease: false + draft: false + files: bin/* + generate_release_notes: true + name: ${{ github.event.inputs.tag_name }} \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 12bb69ce..f76f2e86 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -3,6 +3,10 @@ We'd love to accept your patches and contributions to this project. There are just a few small guidelines you need to follow. +## Developer guide + +Check out the [developer guide](developer-guide.md) to learn about development practices for the project. + ## Code reviews All submissions, including submissions by project members, require review. We diff --git a/README.md b/README.md index ef9e2431..10f31a55 100644 --- a/README.md +++ b/README.md @@ -1,50 +1,17 @@ +[![Build](https://github.com/kserve/modelmesh/actions/workflows/build.yml/badge.svg?branch=main)](https://github.com/kserve/modelmesh/actions/workflows/build.yml) + # ModelMesh The ModelMesh framework is a mature, general-purpose model serving management/routing layer designed for high-scale, high-density and frequently-changing model use cases. It works with existing or custom-built model servers and acts as a distributed LRU cache for serving runtime models. -See these [these charts](https://github.com/kserve/modelmesh/files/8854091/modelmesh-jun2022.pdf) for more information on supported features and design details. - For full Kubernetes-based deployment and management of ModelMesh clusters and models, see the [ModelMesh Serving](https://github.com/kserve/modelmesh-serving) repo. This includes a separate controller and provides K8s custom resource based management of ServingRuntimes and InferenceServices along with common, abstracted handling of model repository storage and ready-to-use integrations with some existing OSS model servers. -### Quick-Start - -1. Wrap your model-loading and invocation logic in this [model-runtime.proto](./src/main/proto/current/model-runtime.proto) gRPC service interface - - `runtimeStatus()` - called only during startup to obtain some basic configuration parameters from the runtime, such as version, capacity, model-loading timeout - - `loadModel()` - load the specified model into memory from backing storage, returning when complete - - `modelSize()` - determine size (mem usage) of previously-loaded model. If very fast, can be omitted and provided instead in the response from `loadModel` - - `unloadModel()` - unload previously loaded model, returning when complete - - Use a separate, arbitrary gRPC service interface for model inferencing requests. It can have any number of methods and they are assumed to be idempotent. See [predictor.proto](src/test/proto/predictor.proto) for a very simple example. - - The methods of your custom applier interface will be called only for already fully-loaded models. -2. Build a grpc server docker container which exposes these interfaces on localhost port 8085 or via a mounted unix domain socket -3. Extend the [Kustomize-based Kubernetes manifests](config) to use your docker image, and with appropriate mem and cpu resource allocations for your container -4. Deploy to a Kubernetes cluster as a regular Service, which will expose [this grpc service interface](./src/main/proto/current/model-mesh.proto) via kube-dns (you do not implement this yourself), consume using grpc client of your choice from your upstream service components - - `registerModel()` and `unregisterModel()` for registering/removing models managed by the cluster - - Any custom inferencing interface methods to make a runtime invocation of previously-registered model, making sure to set a `mm-model-id` or `mm-vmodel-id` metadata header (or `-bin` suffix equivalents for UTF-8 ids) - -### Deployment and Upgrades - -Prerequisites: - -- An etcd cluster (shared or otherwise) -- A Kubernetes namespace with the etcd cluster connection details configured as a secret key in [this json format](https://github.com/IBM/etcd-java/blob/master/etcd-json-schema.md) - - Note that if provided, the `root_prefix` attribute _is_ used as a key prefix for all of the framework's use of etcd - -From an operational standpoint, ModelMesh behaves just like any other homogeneous clustered microservice. This means it can be deployed, scaled, migrated and upgraded as a regular Kubernetes deployment without any special coordination needed, and without any impact to live service usage. - -In particular the procedure for live upgrading either the framework container or service runtime container is the same: change the image version in the deployment config yaml and then update it `kubectl apply -f model-mesh-deploy.yaml` +For more information on supported features and design details, see [these charts](https://github.com/kserve/modelmesh/files/8854091/modelmesh-jun2022.pdf). -### Build +## Get Started -Sample build: +To learn more about and get started with the ModelMesh framework, check out [the documentation](/docs). -```bash -GIT_COMMIT=$(git rev-parse HEAD) -BUILD_ID=$(date '+%Y%m%d')-$(git rev-parse HEAD | cut -c -5) -IMAGE_TAG_VERSION="dev" -IMAGE_TAG=${IMAGE_TAG_VERSION}-$(git branch --show-current)_${BUILD_ID} +## Developer guide -docker build -t modelmesh:${IMAGE_TAG} \ - --build-arg imageVersion=${IMAGE_TAG} \ - --build-arg buildId=${BUILD_ID} \ - --build-arg commitSha=${GIT_COMMIT} . -``` +Use the [developer guide](developer-guide.md) to learn about development practices for the project. diff --git a/developer-guide.md b/developer-guide.md new file mode 100644 index 00000000..f37ef8d7 --- /dev/null +++ b/developer-guide.md @@ -0,0 +1,220 @@ +# Developer Guide + +## Prerequisites + +You need [Java](https://openjdk.org/) and [Maven](https://maven.apache.org/guides/getting-started/maven-in-five-minutes.html#running-maven-tools) +to build ModelMesh from source and [`etcd`](https://etcd.io/) to run the unit tests. +To build your custom `modelmesh` container image and deploy it to a ModelMesh Serving installation on a Kubernetes cluster, +you need the [`docker`](https://docs.docker.com/engine/reference/commandline/cli/) and +[`kubectl`](https://kubectl.docs.kubernetes.io/references/kubectl/) CLIs. +On `macOS` you can install the required CLIs with [Homebrew](https://brew.sh/): + +- Java: `brew install java` +- Maven: `brew install maven` +- Etcd: `brew install etcd` +- Docker: `brew install docker` +- Kubectl: `brew install kubectl` + +## Generating sources + +The gRPC stubs like the `ModelMeshGrpc` class have to be generated by the gRPC proto compiler from +the `.proto` source files under `src/main/proto`. +The generated sources should be created in the target directory `target/generated-sources/protobuf/grpc-java`. + +To generate the sources run either of the following commands: + +```shell +mvn package -DskipTests +mvn install -DskipTests +``` + +## Project setup using an IDE + +If you are using an IDE like [IntelliJ IDEA](https://www.jetbrains.com/idea/) or [Eclipse](https://eclipseide.org/) +to help with your code development you should set up source and target folders so that the IDE's compiler can find all +the source code including the generated sources (after running `mvn install -DskipTests`). + +For IntelliJ this can be done by going to **File > Project Structure ... > Modules**: + +- **Source Folders** + - src/main/java + - src/main/proto + - target/generated-sources/protobuf/grpc-java (generated) + - target/generated-sources/protobuf/java (generated) +- **Test Source Folders** + - src/test/java + - target/generated-test-sources/protobuf/grpc-java (generated) + - target/generated-test-sources/protobuf/java (generated) +- **Resource Folders** + - src/main/resources +- **Test Resource Folders** + - src/test/resources +- **Excluded Folders** + - target + +You may also want to increase your Java Heap size to at least 1.5 GB. + +## Testing code changes + +**Note**, before running the test cases, make sure you have `etcd` installed (see #prerequisites): + +```Bash +$ etcd --version + +etcd Version: 3.5.5 +Git SHA: 19002cfc6 +Go Version: go1.19.1 +Go OS/Arch: darwin/amd64 +``` + +You can either run all test suites at once. You can use the `-q` flag to reduce noise: + +```Bash +mvn test -q +``` + +Or you can run individual test cases: + +```Bash +mvn test -Dtest=ModelMeshErrorPropagationTest +mvn test -Dtest=SidecarModelMeshTest,ModelMeshFailureExpiryTest +``` + +It can be handy to use `grep` to reduce output noise: + +```Bash +mvn test -Dtest=SidecarModelMeshTest,ModelMeshFailureExpiryTest | \ + grep -E " Running |\[ERROR\]|Failures|SUCCESS|Skipp|Total time|Finished" + +[INFO] Running com.ibm.watson.modelmesh.ModelMeshFailureExpiryTest +[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 10.257 s - in com.ibm.watson.modelmesh.ModelMeshFailureExpiryTest +[INFO] Running com.ibm.watson.modelmesh.SidecarModelMeshTest +[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 17.302 s - in com.ibm.watson.modelmesh.SidecarModelMeshTest +[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0 +[INFO] BUILD SUCCESS +[INFO] Total time: 39.916 s +[INFO] Finished at: 2022-11-01T14:33:33-07:00 +``` + +## Building the container image + +After testing your code changes locally, it's time to build a new `modelmesh` container image. Replace the value of the +`DOCKER_USER` environment variable to your DockerHub user ID and change the `IMAGE_TAG` to something meaningful. + +```bash +export DOCKER_USER="" +export IMAGE_NAME="${DOCKER_USER}/modelmesh" +export IMAGE_TAG="dev" +export GIT_COMMIT=$(git rev-parse HEAD) +export BUILD_ID=$(date '+%Y%m%d')-$(git rev-parse HEAD | cut -c -5) + +docker build -t ${IMAGE_NAME}:${IMAGE_TAG} \ + --build-arg imageVersion=${IMAGE_TAG} \ + --build-arg buildId=${BUILD_ID} \ + --build-arg commitSha=${GIT_COMMIT} . + +docker push ${IMAGE_NAME}:${IMAGE_TAG} +``` + +## Updating the ModelMesh Serving deployment + +In order to test the code changes in an existing [ModelMesh Serving](https://github.com/kserve/modelmesh-serving) deployment, +the newly built container image needs to be added to the `model-serving-config` ConfigMap. + +First, check if your ModelMesh Serving deployment already has an existing `model-serving-config` ConfigMap: + +```Shell +kubectl get configmap + +NAME DATA AGE +kube-root-ca.crt 1 4d2h +model-serving-config 1 4m14s +model-serving-config-defaults 1 4d2h +tc-config 2 4d2h +``` + +If the ConfigMap list contains `model-serving-config`, save the contents of your existing configuration +in a local temp file: + +```Bash +mkdir -p temp +kubectl get configmap model-serving-config -o yaml > temp/model-serving-config.yaml +``` + +And add the `modelMeshImage` property to the `config.yaml` string property: +```YAML + modelMeshImage: + name: /modelmesh + tag: dev +``` + +Replace the `` placeholder with your Docker username/login. + +The complete ConfigMap YAML file might look like this: + +```YAML +apiVersion: v1 +kind: ConfigMap +metadata: + name: model-serving-config + namespace: modelmesh-serving +data: + config.yaml: | + podsPerRuntime: 1 + restProxy: + enabled: true + scaleToZero: + enabled: false + gracePeriodSeconds: 5 + modelMeshImage: + name: /modelmesh + tag: dev +``` + +Apply the ConfigMap to your cluster: + +```Bash +kubectl apply -f temp/model-serving-config.yaml +``` + +If you are comfortable using vi, you can forgo creating a temp file and edit the ConfigMap directly in the terminal: + +```Shell +kubectl edit configmap model-serving-config +``` + +If you did not already have a `model-serving-config` ConfigMap on your cluster, you can create one like this: + +```shell +# export DOCKER_USER="" +# export IMAGE_NAME="${DOCKER_USER}/modelmesh" +# export IMAGE_TAG="dev" + +kubectl apply -f - < methodInfos` (optional) +- `bool allowAnyMethod` - applicable only if one or more `methodInfos` are provided. +- `bool limitModelConcurrency` - (experimental) + +It's expected that all model runtime instances in the same cluster (with same Kubernetes deployment config including image version) will report the same values for these, although it's not strictly necessary. + +## TLS (SSL) Configuration + +This can be configured via environment variables on the ModelMesh container, refer to [the documentation](/docs/configuration/tls.md). + +## Model Auto-Scaling + +Nothing needs to be configured to enable this, it is on by default. There is a single configuration parameter which can optionally be used to tune the sensitivity of the scaling, based on rate of requests per model. Note that this applies to scaling copies of models within existing pods, not scaling of the pods themselves. + +The scale-up RPM threshold specifies a target request rate per model **copy** measured in requests per minute. Model-mesh balances requests between loaded copies of a given model evenly, and if one copy's share of requests increases above this threshold more copies will be added if possible in instances (replicas) that do not currently have the model loaded. + +The default for this parameter is 2000 RPM. It can be overridden by setting either the `MM_SCALEUP_RPM_THRESHOLD` environment variable or `scaleup_rpm_threshold` etcd/zookeeper dynamic config parameter, with the latter taking precedence. + +Other points to note: + +- Scale up can happen by more than one additional copy at a time if the request rate breaches the configured threshold by a sufficient amount. +- The number of replicas in the deployment dictates the maximum number of copies that a given model can be scaled to (one in each Pod). +- Models will scale to two copies if they have been used recently regardless of the load - the autoscaling behaviour applies between 2 and N>2 copies. +- Scale-down will occur slowly once the per-copy load remains below the configured threshold for long enough. +- Note that if the runtime is in latency-based auto-scaling mode (when the runtime returns non-default `limitModelConcurrency = true` in the `RuntimeStatusResponse`), scaling is triggered based on measured latencies/queuing rather than request rates, and the RPM threshold parameter will have no effect. + +## Request Header Logging + +To have particular gRPC request metadata headers included in any request-scoped log messages, set the `MM_LOG_REQUEST_HEADERS` environment variable to a json string->string map (object) whose keys are the header names to log and values are the names of corresponding entries to insert into the logger thread context map (MDC). + +Values can be either raw ascii or base64-encoded utf8; in the latter case the corresponding header name must end with `-bin`. For example: +``` +{ + "transaction_id": "txid", + "user_id-bin": "user_id" +} +``` +**Note**: this does not generate new log messages and successful requests aren't logged by default. To log a message for every request, additionally set the `MM_LOG_EACH_INVOKE` environment variable to true. + +## Other Optional Parameters + +Set via environment variables on the ModelMesh container: + +- `MM_SVC_GRPC_PORT` - external grpc port, default 8033 +- `INTERNAL_GRPC_SOCKET_PATH` - unix domain socket, which should be a file location on a persistent volume mounted in both the model-mesh and model runtime containers, defaults to /tmp/mmesh/grpc.sock +- `INTERNAL_SERVING_GRPC_SOCKET_PATH` - unix domain socket to use for inferencing requests, defaults to be same as primary domain socket +- `INTERNAL_GRPC_PORT` - pod-internal grpc port (model runtime localhost), default 8056 +- `INTERNAL_SERVING_GRPC_PORT` - pod-internal grpc port to use for inferencing requests, defaults to be same as primary pod-internal grpc port +- `MM_SVC_GRPC_MAX_MSG_SIZE` - max message size in bytes, default 16MiB +- `MM_SVC_GRPC_MAX_HEADERS_SIZE` - max headers size in bytes, defaults to gRPC default +- `MM_METRICS` - metrics configuration, see Metrics wiki page +- `MM_MULTI_PARALLELISM` - max multi-model request parallelism, default 4 +- `KV_READ_ONLY` (advanced) - run in "read only" mode where new (v)models cannot be registered or unregistered +- `MM_LOG_EACH_INVOKE` - log an INFO level message for every request; default is false, set to true to enable +- `MM_SCALEUP_RPM_THRESHOLD` - see Model auto-scaling above + +**Note**: only one of `INTERNAL_GRPC_SOCKET_PATH` and `INTERNAL_GRPC_PORT` can be set. The same goes for `INTERNAL_SERVING_GRPC_SOCKET_PATH` and `INTERNAL_SERVING_GRPC_PORT`. + +Set dynamically in kv-store (etcd or zookeeper): +- log_each_invocation - dynamic override of `MM_LOG_EACH_INVOKE` env var +- logger_level - TODO +- scaleup_rpm_threshold - dynamic override of `MM_SCALEUP_RPM_THRESHOLD` env var, see [auto-scaling](#model-auto-scaling) above. diff --git a/docs/configuration/payloads.md b/docs/configuration/payloads.md new file mode 100644 index 00000000..e1eb66d5 --- /dev/null +++ b/docs/configuration/payloads.md @@ -0,0 +1,26 @@ +## Payload Processing Overview +ModelMesh exchanges `Payloads` with models deployed within runtimes. In ModelMesh, a `Payload` consists of information regarding the id of the model and the method of the model being called, together with some data (actual binary requests or responses) and metadata (e.g., headers). + +A `PayloadProcessor` is responsible for processing such `Payloads` for models served by ModelMesh. Examples would include loggers of prediction requests, data sinks for data visualization, model quality assessment, or monitoring tooling. + +They can be configured to only look at payloads that are consumed and produced by certain models, or payloads containing certain headers, etc. This configuration is performed at the ModelMesh instance level. Multiple `PayloadProcessors` can be configured per each ModelMesh instance, and they can be set to care about specific portions of the payload (e.g., model inputs, model outputs, metadata, specific headers, etc.). + +As an example, a `PayloadProcessor` can see input data as below: + +```text +[mmesh.ExamplePredictor/predict, Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.51.1,mm-model-id=myModel,another-custom-header=custom-value,grpc-accept-encoding=gzip,grpc-timeout=1999774u), CompositeByteBuf(ridx: 0, widx: 2000004, cap: 2000004, components=147) +``` + +and/or output data as `ByteBuf`: +```text +java.nio.HeapByteBuffer[pos=0 lim=65 cap=65] +``` + +A `PayloadProcessor` can be configured by means of a whitespace separated `String` of URIs. For example, in a URI like `logger:///*?pytorch1234#predict`: +- the scheme represents the type of processor, e.g., `logger` +- the query represents the model id to observe, e.g., `pytorch1234` +- the fragment represents the method to observe, e.g., `predict` + +## Featured `PayloadProcessors`: +- `logger` : logs requests/responses payloads to `model-mesh` logs (_INFO_ level), e.g., use `logger://*` to log every `Payload` +- `http` : sends requests/responses payloads to a remote service (via _HTTP POST_), e.g., use `http://10.10.10.1:8080/consumer/kserve/v2` to send every `Payload` to the specified HTTP endpoint \ No newline at end of file diff --git a/docs/configuration/scaling.md b/docs/configuration/scaling.md new file mode 100644 index 00000000..dd96bacb --- /dev/null +++ b/docs/configuration/scaling.md @@ -0,0 +1,31 @@ +ModelMesh relies on [Kubernetes for rolling updates](https://kubernetes.io/docs/tutorials/kubernetes-basics/update/update-intro/). For the sake of simplicity and elasticity, ModelMesh does not keep track of update states or so internally. + +## Scaling Up/Down + +ModelMesh follows the process below, skipping the termination/migration steps in the context of scaling up (adding new pods). + +1. A new Pod with updates starts. +2. Kubernetes awaits the new Pod to report `Ready` state. +3. If ready, it triggers termination of the old Pod. +4. Once the old Pod receives a termination signal from Kubernetes, it will begin to migrate its models to other instances. + +Asynchronously, ModelMesh will try to rebalance model distribution among all the pods with `Ready` state. + +## Fail Fast with Readiness Probes + +When an update triggers a cluster-wise failure, resulting in the failure to load existing models on new pods, fail fast protection will prevent old cluster from shutting down completely by using [Readiness Probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes). + +ModelMesh achieves fail fast by collecting statistics about loading failures during the startup period. Specifically: + +1. Critical Failure - if this model loaded successfully on other pods, but cannot be loaded on this pod. +2. General Failure - if a new model cannot be loaded on this pod. + +However, this statistics are only collected during the startup period. The length of this period can be controlled by the environment variable `BOOTSTRAP_CLEARANCE_PERIOD_MS`. Once failure statistics exceed the threshold on certain pods, these pods will start to report a `NOT READY` state. This will prevent the old pods from terminating. + +The default `BOOTSTRAP_CLEARANCE_PERIOD_MS` is 3 minutes (180,000 ms). + +**Note**: you may also want to tweak the readiness probes' parameters as well. For example, increasing `initialDelaySeconds` may help slow down the shutdown old pods too early. + +## Rolling Update Configuration + +Specify `maxUnavailable` and `maxSurge` [as described here](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#rolling-update-deployment) to control the rolling update process. \ No newline at end of file diff --git a/docs/configuration/tls.md b/docs/configuration/tls.md new file mode 100644 index 00000000..3a445a0d --- /dev/null +++ b/docs/configuration/tls.md @@ -0,0 +1,68 @@ +## Enable TLS/SSL + +TLS between the ModelMesh container and the model runtime container isn't currently required or supported, since the communication happens with a single pod over localhost. + +However, TLS must be enabled in production deployments for the external gRPC service interfaces exposed by ModelMesh itself (which include your proxied custom gRPC interface). + +To do this, you must provide both private key and corresponding cert files in pem format, volume-mounting them into the ModelMesh container from a kubernetes secret. TLS is then enabled by setting the values of the following env vars on the ModelMesh container to the paths of those mounted files as demonstrated [here](https://github.com/kserve/modelmesh/blob/main/config/base/patches/tls.yaml#L39-L42). + +The same certificate pair will then also be used for "internal" communication between the model-mesh pods, which is unencrypted otherwise (in prior versions the internal traffic was encrypted unconditionally, but using "hardcoded" certs baked into the image which have now been removed). + +## Client Authentication + +To additionally enable TLS Client Auth (aka Mutual Auth, mTLS): + +- Set the `MM_TLS_CLIENT_AUTH` env var to either `REQUIRE` or `OPTIONAL` (case-insensitive) +- Mount pem-format cert(s) to use for trust verification into the container, and set the `MM_TLS_TRUST_CERT_PATH` to a comma-separated list of the mounted paths to these files + +## Certificate Format + +A `PKCS8` format key is required due to netty [only supporting PKCS8 keys](https://github.com/netty/netty/wiki/SslContextBuilder-and-Private-Key). + +For a key cert pair, `server.crt` and `server.key`, you can convert an unencrypted `PKCS1` key to `PKCS8`. + +``` +$ openssl pkcs8 -topk8 -nocrypt -in server.key -out mmesh.key +``` + +If only one hash is displayed, they match. You can also use the above command to verify the original key cert pair `server.crt` and `server.key`. + +### cert-manager +If you are using [cert-manager](https://github.com/cert-manager/cert-manager) on Kubernetes/OpenShift to generate certificates, just ensure that the `.spec.privateKey.encoding` field of your Certificate CR is set to `PKCS8` (it defaults to `PKCS1`). + +## Updating and Rotating Private Keys + +Because the provided certificates are also used for intra-cluster communication, care must be taken when updating to a new private key to avoid potential temporary impact to the service. All pods inter-communicate during rolling upgrade transitions, so the new pods must be able to connect to the old pods and vice versa. If new trust certs are required for the new private key, an update must be performed first to ensure both old and new trust certs are used, and these must both remain present for the subsequent key update. Note that these additional steps are not required if a common and unchanged CA certificate is used for trust purposes. + +There is a dedicated env var `MM_INTERNAL_TRUST_CERTS` which can be used to specify additional trust (public) certificates for inter-cluster communication only. It can be set to one or more comma-separated paths which point to either individual pem-formatted cert files or directories containing certs with `.pem` and/or `.crt` extensions. These paths would correspond to Kube-mounted secrets. Here is an example of the three distinct updates required: + +1. Add `MM_INTERNAL_TRUST_CERTS` pointing to the new cert: +``` +- name: MM_TLS_KEY_CERT_PATH + value: /path/to/existing-keycert.pem +- name: MM_TLS_PRIVATE_KEY_PATH + value: /path/to/existing-key.pem +- name: MM_INTERNAL_TRUST_CERTS + value: /path/to/new-cacert.pem +``` +2. Switch to the new private key pair, with `MM_INTERNAL_TRUST_CERTS` now pointing to the old cert: +``` +- name: MM_TLS_KEY_CERT_PATH + value: /path/to/new-keycert.pem +- name: MM_TLS_PRIVATE_KEY_PATH + value: /path/to/new-key.pem +- name: MM_INTERNAL_TRUST_CERTS + value: /path/to/existing-keycert.pem +``` +3. Optionally remove `MM_TRUST_CERTS`: +``` +- name: MM_TLS_KEY_CERT_PATH + value: /path/to/new-keycert.pem +- name: MM_TLS_PRIVATE_KEY_PATH + value: /path/to/new-key.pem +``` + +**Note**: these additional steps shouldn't be required if either: + +- The same CA is used for both the old and new public certs (so they are not self-signed) +- Some temporary service disruption is acceptable - this will likely manifest as some longer response times during the upgrade, possibly with some timeouts and failures. It should not persist beyond the rolling update process and the exact magnitude of the impact depends on various factors such as cluster size, loading time, request volume and patterns, etc. \ No newline at end of file diff --git a/docs/images/vmodels.png b/docs/images/vmodels.png new file mode 100644 index 00000000..47943abf Binary files /dev/null and b/docs/images/vmodels.png differ diff --git a/docs/metrics.md b/docs/metrics.md new file mode 100644 index 00000000..fc312d89 --- /dev/null +++ b/docs/metrics.md @@ -0,0 +1,66 @@ +ModelMesh publishes a variety of metrics related to model request rates and timings, model loading/unloading rates, times and sizes, internal queuing delays, capacity/usage, cache state/LRU, and more, which can be used in addition to the Kubernetes-level resource metrics. + +### Configuring metrics + +By default, metrics are pushed to Sysdig via the StatsD protocol on UDP port `8126` but Prometheus-based metrics publishing (pull, instead of push) is also supported and recommended over StatsD. It is not currently the default since there are some annotations which also need to be added to the ModelMesh pod spec before Sysdig will capture the metrics (see [below](#enabling-sysdig-capture-of-prometheus-metrics)). + +The `MM_METRICS` env variable can be used to configure or disable how metrics are published: + +- To disable metrics, set it to `disabled`. +- Otherwise, set to `[:param1=val1;param2=val2;...;paramN=valN]` where `` can be either `statsd` or `prometheus`, and `paramI=valI` are optional associated parameters from the table below: + +| | Purpose | Applies to | Default | +|:------------:|:---------------------------------------------------------------------------------------------------------------------------------------------------------------:|:--------------------------------------------------:|:----------------------------------:| +| `port` | Port on which to send or serve metrics | statsd (UDP push), prometheus (HTTP/HTTPS serve) | `8126` (statsd), `2112` (prometheus)| +| `fq_names` | Whether to use fully-qualified method names in request metrics | statsd, prometheus | `false` | +| `legacy` | Whether to publish legacy flavour (non-Sysdig) statsd metrics. Note that the legacy metrics are equivalent but have different names to those in the table below | statsd | `false` | +| `scheme` | Protocol scheme to use for Prometheus metrics, can be `http` or `https` | prometheus | `https` | +|`per_model_metrics`|Whether to include the `modelId` and `vModelId` labels in applicable published metrics|prometheus|`false` (*)| + +(*) In versions of ModelMesh between Sep 5 2023 and Nov 16 2023, the default value of `per_model_metrics` was (unintentionally) `true`. + +### Capturing Prometheus metrics + +Sysdig will only capture Prometheus metrics from pods with the appropriate annotations set. In addition to configuring the `MM_METRICS` env var, the following annotations must be configured on the ModelMesh deployment's Pod spec: + +``` +prometheus.io/path: /metrics +prometheus.io/port: "2112" +prometheus.io/scheme: https +prometheus.io/scrape: "true" +``` + +### List of Exposed Metrics + +| Name | Type | Scope | Description | +|:--------------------------------------------:|:--------:|:---------------:|:--------------------------------------------------------------------------:| +| modelmesh_invoke_model | Count | (statsd only) | Count of internal model server inference requests | +| modelmesh_invoke_model_milliseconds | Timing | | Internal model server inference request time | +| modelmesh_api_request | Count | (statsd only) | Count of external inference requests | +| modelmesh_api_request_milliseconds | Timing | | External inference request time | +| modelmesh_request_size_bytes | Size | | Inference request payload size | +| modelmesh_response_size_bytes | Size | | Inference response payload size | +| modelmesh_cache_miss | Count | (statsd only) | Count of inference request cache misses | +| modelmesh_cache_miss_milliseconds | Timing | | Cache miss delay | +| modelmesh_loadmodel | Count | (statsd only) | Count of model loads | +| modelmesh_loadmodel_milliseconds | Timing | | Time taken to load model | +| modelmesh_loadmodel_failure | Count | | Model load failures | +| modelmesh_unloadmodel | Count | (statsd only) | Count of model unloads | +| modelmesh_unloadmodel_milliseconds | Timing | | Time taken to unload model | +| modelmesh_unloadmodel_failure | Count | | Unload model failures (not counting multiple attempts for same copy) | +| modelmesh_unloadmodel_attempt_failure | Count | | Unload model attempt failures | +| modelmesh_req_queue_delay_milliseconds | Timing | | Time spent in inference request queue | +| modelmesh_loading_queue_delay_milliseconds | Timing | | Time spent in model loading queue | +| modelmesh_model_sizing_milliseconds | Timing | | Time taken to perform model sizing | +| modelmesh_model_evicted | Count | (statsd only) | Count of model copy evictions | +| modelmesh_age_at_eviction_milliseconds | Age | | Time since model was last used when evicted | +| modelmesh_loaded_model_size_bytes | Size | | Reported size of loaded model | +| modelmesh_models_loaded_total | Gauge | Deployment | Total number of models with at least one loaded copy | +| modelmesh_models_with_failure_total | Gauge | Deployment | Total number of models with one or more recent load failures | +| modelmesh_models_managed_total | Gauge | Deployment | Total number of models managed | +| modelmesh_instance_lru_seconds | Gauge | Pod | Last used time of least recently used model in pod (in secs since epoch) | +| modelmesh_instance_lru_age_seconds | Gauge | Pod | Last used age of least recently used model in pod (secs ago) | +| modelmesh_instance_capacity_bytes | Gauge | Pod | Effective model capacity of pod excluding unload buffer | +| modelmesh_instance_used_bytes | Gauge | Pod | Amount of capacity currently in use by loaded models | +| modelmesh_instance_used_bps | Gauge | Pod | Amount of capacity used in basis points (100ths of percent) | +| modelmesh_instance_models_total | Gauge | Pod | Number of model copies loaded in pod | diff --git a/docs/vmodels.md b/docs/vmodels.md new file mode 100644 index 00000000..b7357aac --- /dev/null +++ b/docs/vmodels.md @@ -0,0 +1,56 @@ +# VModels Reference + +Regular models in model-mesh are assumed/required to be immutable. VModels add a layer of indirection in front of the immutable models. A vmodel is an alias mapping to a specific concrete model, which can be changed via the model management APIs. Runtime clients (making inferencing requests) can then specify a vmodel id instead of a model id, and these will automatically be served by the concrete model associated with that vmodel at the time the request is made. + +![vmodels.png](images/vmodels.png) + +The primary use case is for evolving a particular application model over time, so consumers of it automatically benefit from improved versions. For example: + +1. You train and add a spanish model with id `es_1`, and add a vmodel with id es pointing to model `es_1` +2. Later, you train a new spanish model naming it `es_2`, and then set the target of the es model to `es_2` +3. "Users" would just send requests to es and automatically be served latest version + +By default, when updating the target of a vmodel to a new concrete model, model-mesh will manage the transition to prevent any service degredation to end users. Specifically, the existing active model will continue to be served until the target model is "ready" - meaning it has successfully completed loading and is scaled out to the same degree as the existing active model. + +No changes are required to model runtime implementations to start using vmodels. They continue to work with concrete "immutable" models in the same way and do not need to be aware of vmodels or the indirection taking place. + +The keyspaces for models and vmodels are independent, and so for example while not recommended, there is nothing stopping you creating a model and vmodel with the same id. + +## Usage + +There are two main approaches for working with vmodels from a management point of view: + +1. Add and delete (register/unregister) underlying models independently of vmodels, using the existing model management APIs + - When creating or updating a vmodel, the target model is already assumed to exist + - When a concrete model is no longer required/used by a vmodel, it remains registered and usable (when addressed directly), and must be deleted explicitly if/when desired + - Attempting to delete a concrete model which is still referenced by one or more vmodels will fail + - When calling `setVModel`, do not provide `modelInfo` and do not set `autoDeleteTargetModel` to `true` +2. Add/delete concrete models in concert with vmodels - this is the simpler and recommended approach. + - To add and move to a new (concrete) version of a vmodel, use the `setVModel` API, providing `modelInfo` (which would otherwise have been passed to `registerModel`) and setting `autoDeleteTargetModel` to true + - Concrete models added in this manner will be automatically deleted once no longer referenced by the vmodel (i.e. once it has completed transition to a newer concrete model or been deleted itself) + - The `getModelStatus` rpc can be used to determine when concrete models have been deleted from model-mesh (must return `NOT_FOUND` status), i.e. to know when it is safe to remove the backing data + +To send an inference request to a vmodel rather than a concrete model from runtime clients, just set the `mm-vmodel-id` gRPC metadata header instead of `mm-model-id`. + +## VModel Management API + +See additions to the ModelMesh protobuf service interface [here](https://github.com/kserve/modelmesh/blob/main/src/main/proto/current/model-mesh.proto#L42). + +`setVModel()` - add a vmodel or set the target model of an existing vmodel +- `SetVModelRequest` + - `string vModelId` + - `string targetModelId` - id of the target "concrete" model + - `boolean updateOnly` - whether to require that the vmodel already exists + - `ModelInfo modelInfo` - if provided, the target model will be added (see registerModel rpc), otherwise it must already exist + - `boolean autoDeleteTargetModel` - if true, the created target model will be automatically deleted when no longer referenced. Applies only if modelInfo is provided + - `boolean loadNow` - whether the new target model should be loaded immediately, even if the current active model isn't loaded (otherwise the target model will be loaded to the same scale as the current active model before it becomes the active model) + - `boolean force` - if true, the active model will be updated immediately, regardless of the relative states of the target and currently-active models + - `boolean sync` - whether this rpc invocation should block until the transition completes. If the vmodel didn't already exist and and loadNow is set to true, this will cause the method to block until the target of the newly created vmodel has completed loading + - returns `VModelStatusInfo` +- `getVModelStatus()` - check the existence and status of a vmodel + - `GetVModelStatus` + - `string vModelId` + - returns `VModelStatusInfo` +- `deleteVModel()` - to delete a vmodel + - `DeleteVModelRequest` + - `string vModelId` \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1fbcc81b..87638615 100644 --- a/pom.xml +++ b/pom.xml @@ -57,25 +57,29 @@ ${env.BUILD_TAG} - 1.57.2 - 4.1.100.Final + 1.60.2 + 4.1.108.Final 1.7.2 0.5.1 - 0.0.22 - 3.23.0 - 9.0.75 - 32.1.2-jre - 2.15.2 + 0.0.24 + 3.25.3 + 9.0.87 + 33.1.0-jre + 2.16.2 2.10.1 - 0.18.1 + 0.20.0 11.1.0 - 2.20.0 + 2.23.1 1.7.36 0.9.0 + 1.78 - 5.9.3 + 5.10.2 + + 3.8.4 + 5.3.0 3.7.2 5.3.0 @@ -532,6 +536,12 @@ org.apache.zookeeper zookeeper ${zookeeper-version} + + + ch.qos.logback + logback-classic + + org.apache.curator diff --git a/src/main/java/com/ibm/watson/modelmesh/Metrics.java b/src/main/java/com/ibm/watson/modelmesh/Metrics.java index 7be788fe..981c4cb1 100644 --- a/src/main/java/com/ibm/watson/modelmesh/Metrics.java +++ b/src/main/java/com/ibm/watson/modelmesh/Metrics.java @@ -172,7 +172,7 @@ public PrometheusMetrics(Map params, Map infoMet int port = 2112; boolean shortNames = true; boolean https = true; - boolean perModelMetricsEnabled = true; + boolean perModelMetricsEnabled = false; String memMetrics = "all"; // default to all for (Entry ent : params.entrySet()) { switch (ent.getKey()) { diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java index 78c776b4..f8779651 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java @@ -927,7 +927,7 @@ protected final TProcessor initialize() throws Exception { // "type" or "type:p1=v1;p2=v2;...;pn=vn" private static final Pattern METRICS_CONFIG_PATT = Pattern.compile("([a-z;]+)(:\\w+=[^;]+(?:;\\w+=[^;]+)*)?"); // "metric_name" or "metric:name;l1=v1,l2=v2,...,ln=vn," - private static final Pattern CUSTOM_METRIC_CONFIG_PATT = Pattern.compile("([a-z_:]+);(\\w+=[^;]+(?:;\\w+=[^,]+)*)?"); + private static final Pattern CUSTOM_METRIC_CONFIG_PATT = Pattern.compile("([a-z_:]+);(\\w+=[^;]+(?:;\\w+=[^,]++)*)?"); private static Metrics setUpMetrics() throws Exception { if (System.getenv("MM_METRICS_STATSD_PORT") != null || System.getenv("MM_METRICS_PROMETHEUS_PORT") != null) { @@ -1309,6 +1309,7 @@ boolean isLeader() { @Override protected boolean isReady() { if (abortStartup) { + logger.info("Returning NOT READY to readiness probe due to unexpected model loading failures"); return false; } // called only post-initialization diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index 715c0efe..14073fd0 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -725,6 +725,7 @@ public void onHalfClose() { String vModelId = null; String requestId = null; ModelResponse response = null; + ByteBuf responsePayload = null; try (InterruptingListener cancelListener = newInterruptingListener()) { if (logHeaders != null) { logHeaders.addToMDC(headers); // MDC cleared in finally block @@ -767,18 +768,20 @@ public void onHalfClose() { } finally { if (payloadProcessor != null) { processPayload(reqMessage.readerIndex(reqReaderIndex), - requestId, resolvedModelId, methodName, headers, null, true); + requestId, resolvedModelId, vModelId, methodName, headers, null); } else { releaseReqMessage(); } reqMessage = null; // ownership released or transferred } - respReaderIndex = response.data.readerIndex(); respSize = response.data.readableBytes(); call.sendHeaders(response.metadata); + if (payloadProcessor != null) { + responsePayload = response.data.retainedSlice(); + } call.sendMessage(response.data); - // response is released via ReleaseAfterResponse.releaseAll() + // final response refcount is released via ReleaseAfterResponse.releaseAll() status = OK; } catch (Exception e) { status = toStatus(e); @@ -795,17 +798,13 @@ public void onHalfClose() { evictMethodDescriptor(methodName); } } finally { - final boolean releaseResponse = status != OK; if (payloadProcessor != null) { - ByteBuf data = null; - Metadata metadata = null; - if (response != null) { - data = response.data.readerIndex(respReaderIndex); - metadata = response.metadata; - } - processPayload(data, requestId, resolvedModelId, methodName, metadata, status, releaseResponse); - } else if (releaseResponse && response != null) { - response.release(); + Metadata metadata = response != null ? response.metadata : null; + processPayload(responsePayload, requestId, resolvedModelId, vModelId, methodName, metadata, status); + } + if (status != OK && response != null) { + // An additional release is required if we call.sendMessage() wasn't sucessful + response.data.release(); } ReleaseAfterResponse.releaseAll(); clearThreadLocals(); @@ -820,23 +819,22 @@ public void onHalfClose() { } /** - * Invoke PayloadProcessor on the request/response data + * Invoke PayloadProcessor on the request/response data. This method takes ownership + * of the passed-in {@code ByteBuf}. + * * @param data the binary data * @param payloadId the id of the request * @param modelId the id of the model + * @param vModelId the id of the vModel * @param methodName the name of the invoked method * @param metadata the method name metadata * @param status null for requests, non-null for responses - * @param takeOwnership whether the processor should take ownership */ - private void processPayload(ByteBuf data, String payloadId, String modelId, String methodName, - Metadata metadata, io.grpc.Status status, boolean takeOwnership) { + private void processPayload(ByteBuf data, String payloadId, String modelId, String vModelId, String methodName, + Metadata metadata, io.grpc.Status status) { Payload payload = null; try { assert payloadProcessor != null; - if (!takeOwnership) { - ReferenceCountUtil.retain(data); - } payload = new Payload(payloadId, modelId, methodName, metadata, data, status); if (payloadProcessor.process(payload)) { data = null; // ownership transferred @@ -1200,6 +1198,7 @@ public void getVModelStatus(GetVModelStatusRequest request, StreamObserver 0) { + if (!modelId.isEmpty()) { modelId = modelId.replaceFirst("/", ""); - if (modelId.length() == 0 || modelId.equals("*")) { + if (modelId.isEmpty() || modelId.equals("*")) { modelId = null; } } else { modelId = null; } } - if (method != null) { - if (method.length() == 0 || method.equals("*")) { - method = null; + if (vModelId != null) { + if (!vModelId.isEmpty()) { + vModelId = vModelId.replaceFirst("/", ""); + if (vModelId.isEmpty() || vModelId.equals("*")) { + vModelId = null; + } + } else { + vModelId = null; } } - return new MatchingPayloadProcessor(processor, method, modelId); + if (method != null && (method.isEmpty() || method.equals("*"))) { + method = null; + } + return new MatchingPayloadProcessor(processor, method, modelId, vModelId); } @Override diff --git a/src/main/java/com/ibm/watson/modelmesh/payload/Payload.java b/src/main/java/com/ibm/watson/modelmesh/payload/Payload.java index 9eed4367..6dcafd17 100644 --- a/src/main/java/com/ibm/watson/modelmesh/payload/Payload.java +++ b/src/main/java/com/ibm/watson/modelmesh/payload/Payload.java @@ -39,6 +39,8 @@ public enum Kind { private final String modelId; + private final String vModelId; + private final String method; private final Metadata metadata; @@ -48,10 +50,17 @@ public enum Kind { // null for requests, non-null for responses private final Status status; + public Payload(@Nonnull String id, @Nonnull String modelId, @Nullable String method, @Nullable Metadata metadata, @Nullable ByteBuf data, @Nullable Status status) { + this(id, modelId, null, method, metadata, data, status); + } + + public Payload(@Nonnull String id, @Nonnull String modelId, @Nullable String vModelId, @Nullable String method, + @Nullable Metadata metadata, @Nullable ByteBuf data, @Nullable Status status) { this.id = id; this.modelId = modelId; + this.vModelId = vModelId; this.method = method; this.metadata = metadata; this.data = data; @@ -68,6 +77,16 @@ public String getModelId() { return modelId; } + @CheckForNull + public String getVModelId() { + return vModelId; + } + + @Nonnull + public String getVModelIdOrModelId() { + return vModelId != null ? vModelId : modelId; + } + @CheckForNull public String getMethod() { return method; @@ -101,6 +120,7 @@ public void release() { public String toString() { return "Payload{" + "id='" + id + '\'' + + ", vModelId=" + (vModelId != null ? ('\'' + vModelId + '\'') : "null") + ", modelId='" + modelId + '\'' + ", method='" + method + '\'' + ", status=" + (status == null ? "request" : String.valueOf(status)) + diff --git a/src/main/java/com/ibm/watson/modelmesh/payload/README.md b/src/main/java/com/ibm/watson/modelmesh/payload/README.md deleted file mode 100644 index 1b4e2464..00000000 --- a/src/main/java/com/ibm/watson/modelmesh/payload/README.md +++ /dev/null @@ -1,34 +0,0 @@ -Processing model-mesh payloads -============================= - -`Model-mesh` exchange `Payloads` with the models deployed within runtimes. -In `model-mesh` a `Payload` consists of information regarding the id of the model and the _method_ of the model being called, together with some data (actual binary requests or responses) and metadata (e.g., headers). -A `PayloadProcessor` is responsible for processing such `Payloads` for models served by `model-mesh`. - -Reasonable examples of `PayloadProcessors` include loggers of prediction requests, data sinks for data visualization, model quality assessment or monitoring tooling. - -A `PayloadProcessor` can be configured to only look at payloads that are consumed and produced by certain models, or payloads containing certain headers, etc. -This configuration is performed at `ModelMesh` instance level. -Multiple `PayloadProcessors` can be configured per each `ModelMesh` instance. - -Implementations of `PayloadProcessors` can care about only specific portions of the payload (e.g., model inputs, model outputs, metadata, specific headers, etc.). - -A `PayloadProcessor` can see input data like the one in this example: -```text -[mmesh.ExamplePredictor/predict, Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.51.1,mm-model-id=myModel,another-custom-header=custom-value,grpc-accept-encoding=gzip,grpc-timeout=1999774u), CompositeByteBuf(ridx: 0, widx: 2000004, cap: 2000004, components=147) -``` - -A `PayloadProcessor` can see output data as `ByteBuf` like the one in this example: -```text -java.nio.HeapByteBuffer[pos=0 lim=65 cap=65] -``` - -A `PayloadProcessor` can be configured by means of a whitespace separated `String` of URIs. -In a URI like `logger:///*?pytorch1234#predict`: -* the scheme represents the type of processor, e.g., `logger` -* the query represents the model id to observe, e.g., `pytorch1234` -* the fragment represents the method to observe, e.g., `predict` - -Featured `PayloadProcessors`: -* `logger` : logs requests/responses payloads to `model-mesh` logs (_INFO_ level), e.g., use `logger://*` to log every `Payload` -* `http` : sends requests/responses payloads to a remote service (via _HTTP POST_), e.g., use `http://10.10.10.1:8080/consumer/kserve/v2` to send every `Payload` to the specified HTTP endpoint \ No newline at end of file diff --git a/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java b/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java index 401fba2d..23c2fba1 100644 --- a/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java +++ b/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java @@ -57,14 +57,10 @@ public boolean process(Payload payload) { private static PayloadContent prepareContentBody(Payload payload) { String id = payload.getId(); String modelId = payload.getModelId(); + String vModelId = payload.getVModelId(); String kind = payload.getKind().toString().toLowerCase(); ByteBuf byteBuf = payload.getData(); - String data; - if (byteBuf != null) { - data = encodeBinaryToString(byteBuf); - } else { - data = ""; - } + String data = byteBuf != null ? encodeBinaryToString(byteBuf) : ""; Metadata metadata = payload.getMetadata(); Map metadataMap = new HashMap<>(); if (metadata != null) { @@ -79,7 +75,7 @@ private static PayloadContent prepareContentBody(Payload payload) { } } String status = payload.getStatus() != null ? payload.getStatus().getCode().toString() : ""; - return new PayloadContent(id, modelId, data, kind, status, metadataMap); + return new PayloadContent(id, modelId, vModelId, data, kind, status, metadataMap); } private static String encodeBinaryToString(ByteBuf byteBuf) { @@ -116,15 +112,17 @@ private static class PayloadContent { private final String id; private final String modelid; + private final String vModelId; private final String data; private final String kind; private final String status; private final Map metadata; - private PayloadContent(String id, String modelid, String data, String kind, String status, - Map metadata) { + private PayloadContent(String id, String modelid, String vModelId, String data, String kind, + String status, Map metadata) { this.id = id; this.modelid = modelid; + this.vModelId = vModelId; this.data = data; this.kind = kind; this.status = status; @@ -143,6 +141,10 @@ public String getModelid() { return modelid; } + public String getvModelId() { + return vModelId; + } + public String getData() { return data; } @@ -160,6 +162,7 @@ public String toString() { return "PayloadContent{" + "id='" + id + '\'' + ", modelid='" + modelid + '\'' + + ", vModelId=" + (vModelId != null ? ('\'' + vModelId + '\'') : "null") + ", data='" + data + '\'' + ", kind='" + kind + '\'' + ", status='" + status + '\'' + diff --git a/src/main/scripts/start.sh b/src/main/scripts/start.sh index c5fe44eb..dbd5b3e0 100644 --- a/src/main/scripts/start.sh +++ b/src/main/scripts/start.sh @@ -367,9 +367,6 @@ echo "SHUTDOWN_TIMEOUT_MS=$SHUTDOWN_TIMEOUT_MS" LITELINKS_ARGS="-Dlitelinks.cancel_on_client_close=true -Dlitelinks.threadcontexts=log_mdc -Dlitelinks.shutdown_timeout_ms=${SHUTDOWN_TIMEOUT_MS} -Dlitelinks.produce_pooled_bytebufs=true" -# have litelinks use OpenSSL instead of JDK TLS implementation (faster) -LL_OPENSSL_ARG="-Dlitelinks.ssl.use_jdk=false" - # These two args are needed to use netty's off-the-books direct buffer allocation NETTY_DIRECTBUF_ARGS="-Dio.netty.tryReflectionSetAccessible=true --add-opens=java.base/java.nio=ALL-UNNAMED" # this defaults to equal max heap, which can result in container OOMKilled @@ -414,7 +411,6 @@ exec $JAVA_HOME/bin/java -cp "$LL_JAR:lib/*" -XX:+UnlockExperimentalVMOptions -X ${JAVA_MAXDIRECT_ARG} ${NETTY_MAXDIRECT_ARG} ${NETTY_DISABLE_CHECK_ARGS} \ ${GRPC_USE_SHARED_ALLOC_ARG} \ ${SSL_PK_ARG} ${TRUSTSTORE_ARG} ${LITELINKS_ARGS} ${CUSTOM_JVM_ARGS} \ - $LL_OPENSSL_ARG \ $PRIVATE_ENDPOINT_ARG \ $LOG_CONFIG_ARG $LOG_PERF_ARGS \ com.ibm.watson.litelinks.server.LitelinksService \ diff --git a/src/test/java/com/ibm/watson/modelmesh/AbstractModelMeshTest.java b/src/test/java/com/ibm/watson/modelmesh/AbstractModelMeshTest.java index d320927b..c27ea6e0 100644 --- a/src/test/java/com/ibm/watson/modelmesh/AbstractModelMeshTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/AbstractModelMeshTest.java @@ -67,7 +67,7 @@ public static > T forModel(T stub, String... modelIds) headers.put(MODEL_ID_META_KEY, modelId); } headers.put(CUST_HEADER_KEY, "custom-value"); - return MetadataUtils.attachHeaders(stub, headers); + return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers)); } public static > T forVModel(T stub, String... vmodelIds) { @@ -75,7 +75,7 @@ public static > T forVModel(T stub, String... vmodelId for (String modelId : vmodelIds) { headers.put(VMODEL_ID_META_KEY, modelId); } - return MetadataUtils.attachHeaders(stub, headers); + return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers)); } // default KV store to test is etcd diff --git a/src/test/java/com/ibm/watson/modelmesh/LegacyAddRemoveProtoTest.java b/src/test/java/com/ibm/watson/modelmesh/LegacyAddRemoveProtoTest.java index ac74d01a..a9a87405 100644 --- a/src/test/java/com/ibm/watson/modelmesh/LegacyAddRemoveProtoTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/LegacyAddRemoveProtoTest.java @@ -221,7 +221,7 @@ public static > T forModel(T stub, String... modelIds) headers.put(MODEL_ID_META_KEY, modelId); } headers.put(CUST_HEADER_KEY, "custom-value"); - return MetadataUtils.attachHeaders(stub, headers); + return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers)); } @BeforeEach diff --git a/src/test/java/com/ibm/watson/modelmesh/LegacyTasProtoTest.java b/src/test/java/com/ibm/watson/modelmesh/LegacyTasProtoTest.java index 0af0e157..87daefcf 100644 --- a/src/test/java/com/ibm/watson/modelmesh/LegacyTasProtoTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/LegacyTasProtoTest.java @@ -221,7 +221,7 @@ public static > T forModel(T stub, String... modelIds) headers.put(MODEL_ID_META_KEY, modelId); } headers.put(CUST_HEADER_KEY, "custom-value"); - return MetadataUtils.attachHeaders(stub, headers); + return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers)); } @BeforeEach diff --git a/src/test/java/com/ibm/watson/modelmesh/ModelMeshHeaderLoggingTest.java b/src/test/java/com/ibm/watson/modelmesh/ModelMeshHeaderLoggingTest.java index 3da5a452..0844c53a 100644 --- a/src/test/java/com/ibm/watson/modelmesh/ModelMeshHeaderLoggingTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/ModelMeshHeaderLoggingTest.java @@ -102,7 +102,8 @@ public void headerLoggingTest() throws Exception { // We'll make sure this one *isn't* logged since it wasn't included in the config headers.put(unloggedHeader, "my-unlogged-value"); - PredictResponse response = MetadataUtils.attachHeaders(useModels, headers).predict(req); + PredictResponse response = useModels.withInterceptors( + MetadataUtils.newAttachHeadersInterceptor(headers)).predict(req); assertEquals("classification for predict me! by model myModel", response.getResults(0).getCategory()); diff --git a/src/test/java/com/ibm/watson/modelmesh/ModelMeshTearDownTest.java b/src/test/java/com/ibm/watson/modelmesh/ModelMeshTearDownTest.java index ef802161..d50aa72c 100644 --- a/src/test/java/com/ibm/watson/modelmesh/ModelMeshTearDownTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/ModelMeshTearDownTest.java @@ -315,7 +315,7 @@ public static int getPID(Process process) throws Exception { } public static int killProcess(Process process) throws Exception { - int pid = getPID(process); - return Runtime.getRuntime().exec("kill -9 " + pid).waitFor(); + String pid = Integer.toString(getPID(process)); + return Runtime.getRuntime().exec(new String[] {"kill", "-9", pid}).waitFor(); } } diff --git a/src/test/java/com/ibm/watson/modelmesh/VModelsTest.java b/src/test/java/com/ibm/watson/modelmesh/VModelsTest.java index a5e43fa0..f56a0bca 100644 --- a/src/test/java/com/ibm/watson/modelmesh/VModelsTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/VModelsTest.java @@ -322,6 +322,6 @@ private static VModelStatusInfo clearTargetCopyInfo(VModelStatusInfo vmsi) { public static > T forVModel(T stub, String modelId) { Metadata headers = new Metadata(); headers.put(VMODEL_ID_META_KEY, modelId); - return MetadataUtils.attachHeaders(stub, headers); + return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers)); } } diff --git a/src/test/java/com/ibm/watson/modelmesh/example/ExampleModelClient.java b/src/test/java/com/ibm/watson/modelmesh/example/ExampleModelClient.java index 7d4ed8c0..1c8454ac 100644 --- a/src/test/java/com/ibm/watson/modelmesh/example/ExampleModelClient.java +++ b/src/test/java/com/ibm/watson/modelmesh/example/ExampleModelClient.java @@ -80,7 +80,7 @@ public static void main(String[] args) { public static > T forModel(T stub, String modelId) { Metadata headers = new Metadata(); headers.put(MODEL_ID_META_KEY, modelId); - return MetadataUtils.attachHeaders(stub, headers); + return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers)); } }