diff --git a/.dockerignore b/.dockerignore index 4c49bd78..7350d411 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1 +1,14 @@ +**/*.class +*.jar +*.md +.DS_Store +.cache +.dockerignore .env +.github +.gitignore +.idea +.vscode +default.etcd +target +temp diff --git a/.github/workflows/build-and-push.yml b/.github/workflows/build.yml similarity index 64% rename from .github/workflows/build-and-push.yml rename to .github/workflows/build.yml index 19c7864d..eb5c66cf 100644 --- a/.github/workflows/build-and-push.yml +++ b/.github/workflows/build.yml @@ -1,36 +1,48 @@ -name: Build and Push +name: Build on: + pull_request: + branches: + - "release-[0-9].[0-9]+" + paths-ignore: + - "**.md" push: - branches: [ main ] + branches: + - main + - "release-[0-9].[0-9]+" tags: - - v* - pull_request: + - "v*" + paths-ignore: + - "**.md" + +env: + IMAGE_NAME: "kserve/modelmesh" jobs: test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Set up JDK 17 + - name: Checkout + uses: actions/checkout@v3 + + - name: Set up Java 17 uses: actions/setup-java@v3.1.1 with: java-version: '17' distribution: 'temurin' + - name: Install etcd run: sudo ./.github/install-etcd.sh - - name: Build with Maven + + - name: Build and Test with Maven run: mvn -B package --file pom.xml build: needs: test runs-on: ubuntu-latest - - env: - IMAGE_NAME: kserve/modelmesh - steps: - - uses: actions/checkout@v2 + - name: Checkout + uses: actions/checkout@v3 - name: Setup QEMU uses: docker/setup-qemu-action@v2 @@ -45,7 +57,7 @@ jobs: username: ${{ secrets.DOCKER_USER }} password: ${{ secrets.DOCKER_ACCESS_TOKEN }} - - name: Export version variables + - name: Export docker build args run: | GIT_COMMIT=$(git rev-parse HEAD) BUILD_ID=$(date '+%Y%m%d')-$(git rev-parse HEAD | cut -c -5) @@ -53,24 +65,30 @@ jobs: # Strip git ref prefix from version VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,') + # Generate PR tag from github.ref == "refs/pull/123/merge" + [ "$VERSION" == "merge" ] && VERSION=$(echo "${{ github.ref }}" | sed -e 's,refs/pull/\(.*\)/merge,pr-\1,') + # Use Docker `latest` tag convention [ "$VERSION" == "main" ] && VERSION=latest echo "GIT_COMMIT=$GIT_COMMIT" >> $GITHUB_ENV - echo "BUILD_ID=$BUILD_ID" >> $GITHUB_ENV - echo "VERSION=$VERSION" >> $GITHUB_ENV + echo "BUILD_ID=$BUILD_ID" >> $GITHUB_ENV + echo "VERSION=$VERSION" >> $GITHUB_ENV - - name: Build and push - uses: docker/build-push-action@v3 + # print env vars for debugging + cat "$GITHUB_ENV" + + - name: Build and push runtime image + uses: docker/build-push-action@v4 with: # for linux/s390x, maven errors due to missing io.grpc:protoc-gen-grpc-java:exe:linux-s390_64:1.51.1 platforms: linux/amd64,linux/arm64/v8,linux/ppc64le - pull: true - cache-from: type=gha - cache-to: type=gha,mode=max + target: runtime push: ${{ github.event_name == 'push' }} tags: ${{ env.IMAGE_NAME }}:${{ env.VERSION }} build-args: | imageVersion=${{ env.VERSION }} buildId=${{ env.BUILD_ID }} commitSha=${{ env.GIT_COMMIT }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/Dockerfile b/Dockerfile index 638fdfac..1c9cf748 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,8 +14,11 @@ FROM registry.access.redhat.com/ubi8/ubi-minimal:8.6 as build_base -# https://blog.thesparktree.com/docker-multi-arch-github-actions#architecture-specific-dockerfile-instructions -ARG TARGETARCH=amd64 +# https://docs.docker.com/engine/reference/builder/#automatic-platform-args-in-the-global-scope +# don't provide "default" values (e.g. 'ARG TARGETARCH=amd64') for non-buildx environments, +# see https://github.com/docker/buildx/issues/510 +ARG TARGETOS +ARG TARGETARCH ARG ETCD_VERSION=v3.5.4 @@ -23,31 +26,43 @@ LABEL image="build_base" USER root -RUN true \ - && microdnf --nodocs install java-17-openjdk-devel nss \ +ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk + +RUN --mount=type=cache,target=/root/.cache/microdnf:rw \ + microdnf --setopt=cachedir=/root/.cache/microdnf --nodocs install \ + java-17-openjdk-devel \ + nss \ && microdnf update --nodocs \ - && microdnf clean all \ && sed -i 's:security.provider.12=SunPKCS11:#security.provider.12=SunPKCS11:g' /usr/lib/jvm/java-17-openjdk-*/conf/security/java.security \ && sed -i 's:#security.provider.1=SunPKCS11 ${java.home}/lib/security/nss.cfg:security.provider.12=SunPKCS11 ${java.home}/lib/security/nss.cfg:g' /usr/lib/jvm/java-17-openjdk-*/conf/security/java.security \ + && java -version \ && true -RUN microdnf install wget tar gzip maven - -ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk +RUN --mount=type=cache,target=/root/.cache/microdnf:rw \ + microdnf --setopt=cachedir=/root/.cache/microdnf --nodocs install \ + wget \ + tar \ + gzip \ + maven \ + git \ + && true # Install etcd -- used for CI tests -RUN wget -q https://github.com/etcd-io/etcd/releases/download/${ETCD_VERSION}/etcd-${ETCD_VERSION}-linux-${TARGETARCH}.tar.gz && \ - mkdir -p /usr/lib/etcd && \ - tar xzf etcd-*-linux-${TARGETARCH}.tar.gz -C /usr/lib/etcd --strip-components=1 --no-same-owner && \ - rm -rf etcd*.gz - ENV PATH="/usr/lib/etcd:$PATH" +RUN true \ + && wget -q https://github.com/etcd-io/etcd/releases/download/${ETCD_VERSION}/etcd-${ETCD_VERSION}-${TARGETOS:-linux}-${TARGETARCH:-amd64}.tar.gz \ + && mkdir -p /usr/lib/etcd \ + && tar xzf etcd-*-${TARGETOS:-linux}-${TARGETARCH:-amd64}.tar.gz -C /usr/lib/etcd --strip-components=1 --no-same-owner \ + && rm -rf etcd*.gz \ + && etcd -version \ + && true # Copy in code RUN mkdir /build WORKDIR /build + ############################################################################### FROM build_base AS build @@ -57,58 +72,78 @@ COPY / /build ENV MAVEN_OPTS="-Dfile.encoding=UTF8" -RUN mvn -B package -DskipTests=true --file pom.xml +RUN --mount=type=cache,target=/root/.m2 \ + mvn -B package -DskipTests=true --file pom.xml + +# Assume that source code comes from a Git repository +RUN echo "$(date '+%Y%m%d')-$(git rev-parse HEAD | cut -c -5)" > target/dockerhome/build-version && \ + echo "$(git rev-parse HEAD)" > target/dockerhome/release && \ + echo "$(git branch --show-current|sed 's/^release-//g')-$(git branch --show-current)_$(date '+%Y%m%d')-$(git rev-parse HEAD | cut -c -5)" > target/dockerhome/version + ############################################################################### -FROM registry.access.redhat.com/ubi8/ubi-minimal:8.6 +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.6 AS runtime -ARG imageVersion -ARG buildId -ARG commitSha -ARG USER=2000 +# TODO: FROM registry.access.redhat.com/ubi8/openjdk-17-runtime:1.15 -LABEL name="model-mesh" \ - vendor="KServe" \ - version="${imageVersion}" \ - summary="Core model-mesh sidecar image" \ - description="Model-mesh is a distributed LRU cache for serving runtime models" \ - release="${commitSha}" -LABEL maintainer=nickhill@us.ibm.com +ARG USER=2000 USER root -RUN true \ - && microdnf --nodocs install java-17-openjdk-headless nss \ +ENV JAVA_HOME=/usr/lib/jvm/jre-17-openjdk + +RUN --mount=type=cache,target=/root/.cache/microdnf:rw \ + microdnf --setopt=cachedir=/root/.cache/microdnf --nodocs install \ + java-17-openjdk-headless \ + nss \ && microdnf update --nodocs \ - && microdnf clean all \ && sed -i 's:security.provider.12=SunPKCS11:#security.provider.12=SunPKCS11:g' /usr/lib/jvm/java-17-openjdk-*/conf/security/java.security \ && sed -i 's:#security.provider.1=SunPKCS11 ${java.home}/lib/security/nss.cfg:security.provider.12=SunPKCS11 ${java.home}/lib/security/nss.cfg:g' /usr/lib/jvm/java-17-openjdk-*/conf/security/java.security \ + && java -version \ && true -ENV JAVA_HOME=/usr/lib/jvm/jre-17-openjdk - COPY --from=build /build/target/dockerhome/ /opt/kserve/mmesh/ COPY version /etc/modelmesh-version # Make this the current directory when starting the container WORKDIR /opt/kserve/mmesh -RUN microdnf install shadow-utils hostname && \ +RUN --mount=type=cache,target=/root/.cache/microdnf:rw \ + microdnf --setopt=cachedir=/root/.cache/microdnf --nodocs install \ + shadow-utils \ + hostname \ # Create app user - useradd -c "Application User" -U -u ${USER} -m app && \ - chown -R app:0 /home/app && \ + && useradd -c "Application User" -U -u ${USER} -m app \ + && chown -R app:0 /home/app \ # Adjust permissions on /etc/passwd to be writable by group root. # The user app is replaced by the assigned UID on OpenShift. - chmod g+w /etc/passwd && \ + && chmod g+w /etc/passwd \ # In newer Docker there is a --chown option for the COPY command - ln -s /opt/kserve/mmesh /opt/kserve/tas && \ - mkdir -p log && \ - chown -R app:0 . && \ - chmod -R 771 . && chmod 775 *.sh *.py && \ - echo "${buildId}" > /opt/kserve/mmesh/build-version && \ - \ + && ln -s /opt/kserve/mmesh /opt/kserve/tas \ + && mkdir -p log \ + && chown -R app:0 . \ + && chmod -R 771 . \ + && chmod 775 *.sh *.py \ # Disable java FIPS - see https://access.redhat.com/documentation/en-us/openjdk/17/html-single/configuring_openjdk_17_on_rhel_with_fips/index#config-fips-in-openjdk - sed -i 's/security.useSystemPropertiesFile=true/security.useSystemPropertiesFile=false/g' $JAVA_HOME/conf/security/java.security + && sed -i 's/security.useSystemPropertiesFile=true/security.useSystemPropertiesFile=false/g' $JAVA_HOME/conf/security/java.security \ + && true + +# wait to create commit-specific LABEL until end of the build to not unnecessarily +# invalidate the cached image layers +# ARG imageVersion +# ARG buildId +# ARG commitSha + +# Generated at build stage +# RUN echo "${buildId}" > /opt/kserve/mmesh/build-version + +LABEL name="model-mesh" \ + vendor="KServe" \ +# version="${imageVersion}" \ + summary="Core model-mesh sidecar image" \ + description="Model-mesh is a distributed LRU cache for serving runtime models" \ +# release="${commitSha}" \ + maintainer="nickhill@us.ibm.com" EXPOSE 8080 diff --git a/OWNERS b/OWNERS index 09fa332d..592faa9a 100644 --- a/OWNERS +++ b/OWNERS @@ -1,13 +1,18 @@ approvers: + - anishasthana + - danielezonca - heyselbi - israel-hdez - Jooho + - vaibhavjainwiz - VedantMahabaleshwarkar - Xaenalt reviewers: + - anishasthana + - danielezonca - heyselbi - israel-hdez - Jooho + - vaibhavjainwiz - VedantMahabaleshwarkar - Xaenalt - diff --git a/README.md b/README.md index e853ac3f..ef9e2431 100644 --- a/README.md +++ b/README.md @@ -40,10 +40,10 @@ Sample build: ```bash GIT_COMMIT=$(git rev-parse HEAD) BUILD_ID=$(date '+%Y%m%d')-$(git rev-parse HEAD | cut -c -5) -IMAGE_TAG_VERSION=0.0.1 +IMAGE_TAG_VERSION="dev" IMAGE_TAG=${IMAGE_TAG_VERSION}-$(git branch --show-current)_${BUILD_ID} -docker build -t model-mesh:${IMAGE_TAG} \ +docker build -t modelmesh:${IMAGE_TAG} \ --build-arg imageVersion=${IMAGE_TAG} \ --build-arg buildId=${BUILD_ID} \ --build-arg commitSha=${GIT_COMMIT} . diff --git a/pom.xml b/pom.xml index 8322979f..489fc9d6 100644 --- a/pom.xml +++ b/pom.xml @@ -57,17 +57,17 @@ ${env.BUILD_TAG} - 1.53.0 - 4.1.89.Final + 1.57.2 + 4.1.96.Final 1.7.2 0.5.1 0.0.22 - 3.22.0 - 9.0.72 - 31.1-jre - 2.14.2 + 3.23.0 + 9.0.75 + 32.1.2-jre + 2.15.2 2.10.1 - 0.18.0 + 0.18.1 11.1.0 2.20.0 1.7.36 @@ -75,7 +75,7 @@ since we have some custom optimized extensions to this --> 0.9.0 1.70 - 5.9.2 + 5.9.3 ${project.build.directory}/dockerhome false diff --git a/src/main/java/com/ibm/watson/modelmesh/Metrics.java b/src/main/java/com/ibm/watson/modelmesh/Metrics.java index b246a5c3..7be788fe 100644 --- a/src/main/java/com/ibm/watson/modelmesh/Metrics.java +++ b/src/main/java/com/ibm/watson/modelmesh/Metrics.java @@ -16,6 +16,7 @@ package com.ibm.watson.modelmesh; +import com.google.common.base.Strings; import com.ibm.watson.prometheus.Counter; import com.ibm.watson.prometheus.Gauge; import com.ibm.watson.prometheus.Histogram; @@ -36,34 +37,39 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.lang.reflect.Array; import java.net.SocketAddress; import java.nio.channels.DatagramChannel; -import java.util.*; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static com.ibm.watson.modelmesh.Metric.*; +import static com.ibm.watson.modelmesh.Metric.MetricType.*; import static com.ibm.watson.modelmesh.ModelMesh.M; import static com.ibm.watson.modelmesh.ModelMeshEnvVars.MMESH_CUSTOM_ENV_VAR; -import static com.ibm.watson.modelmesh.ModelMeshEnvVars.MMESH_METRICS_ENV_VAR; import static java.util.concurrent.TimeUnit.*; /** * */ interface Metrics extends AutoCloseable { + boolean isPerModelMetricsEnabled(); boolean isEnabled(); void logTimingMetricSince(Metric metric, long prevTime, boolean isNano); - void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano); + void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId); - void logSizeEventMetric(Metric metric, long value); + void logSizeEventMetric(Metric metric, long value, String modelId); void logGaugeMetric(Metric metric, long value); @@ -101,7 +107,7 @@ default void logInstanceStats(final InstanceRecord ir) { * @param respPayloadSize response payload size in bytes (or -1 if not applicable) */ void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code, - int reqPayloadSize, int respPayloadSize); + int reqPayloadSize, int respPayloadSize, String modelId, String vModelId); default void registerGlobals() {} @@ -111,6 +117,11 @@ default void unregisterGlobals() {} default void close() {} Metrics NO_OP_METRICS = new Metrics() { + @Override + public boolean isPerModelMetricsEnabled() { + return false; + } + @Override public boolean isEnabled() { return false; @@ -120,10 +131,10 @@ public boolean isEnabled() { public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) {} @Override - public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) {} + public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId){} @Override - public void logSizeEventMetric(Metric metric, long value) {} + public void logSizeEventMetric(Metric metric, long value, String modelId){} @Override public void logGaugeMetric(Metric metric, long value) {} @@ -136,7 +147,7 @@ public void logInstanceStats(InstanceRecord ir) {} @Override public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code, - int reqPayloadSize, int respPayloadSize) {} + int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) {} }; final class PrometheusMetrics implements Metrics { @@ -154,12 +165,14 @@ final class PrometheusMetrics implements Metrics { private final CollectorRegistry registry; private final NettyServer metricServer; private final boolean shortNames; + private final boolean perModelMetricsEnabled; private final EnumMap metricsMap = new EnumMap<>(Metric.class); public PrometheusMetrics(Map params, Map infoMetricParams) throws Exception { int port = 2112; boolean shortNames = true; boolean https = true; + boolean perModelMetricsEnabled = true; String memMetrics = "all"; // default to all for (Entry ent : params.entrySet()) { switch (ent.getKey()) { @@ -170,6 +183,9 @@ public PrometheusMetrics(Map params, Map infoMet throw new Exception("Invalid metrics port: " + ent.getValue()); } break; + case "per_model_metrics": + perModelMetricsEnabled= "true".equalsIgnoreCase(ent.getValue()); + break; case "fq_names": shortNames = !"true".equalsIgnoreCase(ent.getValue()); break; @@ -188,6 +204,7 @@ public PrometheusMetrics(Map params, Map infoMet throw new Exception("Unrecognized metrics config parameter: " + ent.getKey()); } } + this.perModelMetricsEnabled = perModelMetricsEnabled; registry = new CollectorRegistry(); for (Metric m : Metric.values()) { @@ -220,10 +237,15 @@ public PrometheusMetrics(Map params, Map infoMet } if (m == API_REQUEST_TIME || m == API_REQUEST_COUNT || m == INVOKE_MODEL_TIME - || m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) { - builder.labelNames("method", "code"); + || m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) { + if (this.perModelMetricsEnabled) { + builder.labelNames("method", "code", "modelId", "vModelId"); + } else { + builder.labelNames("method", "code"); + } + } else if (this.perModelMetricsEnabled && m.type != GAUGE && m.type != COUNTER && m.type != COUNTER_WITH_HISTO) { + builder.labelNames("modelId", "vModelId"); } - Collector collector = builder.name(m.promName).help(m.description).create(); metricsMap.put(m, collector); if (!m.global) { @@ -330,6 +352,11 @@ public void close() { this.metricServer.close(); } + @Override + public boolean isPerModelMetricsEnabled() { + return perModelMetricsEnabled; + } + @Override public boolean isEnabled() { return true; @@ -342,13 +369,23 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) { } @Override - public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) { - ((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed); + public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId) { + Histogram histogram = (Histogram) metricsMap.get(metric); + if (perModelMetricsEnabled && modelId != null) { + histogram.labels(modelId, "").observe(isNano ? elapsed / M : elapsed); + } else { + histogram.observe(isNano ? elapsed / M : elapsed); + } } @Override - public void logSizeEventMetric(Metric metric, long value) { - ((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier); + public void logSizeEventMetric(Metric metric, long value, String modelId) { + Histogram histogram = (Histogram) metricsMap.get(metric); + if (perModelMetricsEnabled) { + histogram.labels(modelId, "").observe(value * metric.newMultiplier); + } else { + histogram.observe(value * metric.newMultiplier); + } } @Override @@ -365,23 +402,37 @@ public void logCounterMetric(Metric metric) { @Override public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code, - int reqPayloadSize, int respPayloadSize) { + int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) { final long elapsedMillis = elapsedNanos / M; final Histogram timingHisto = (Histogram) metricsMap .get(external ? API_REQUEST_TIME : INVOKE_MODEL_TIME); int idx = shortNames ? name.indexOf('/') : -1; - final String methodName = idx == -1 ? name : name.substring(idx + 1); - - timingHisto.labels(methodName, code.name()).observe(elapsedMillis); - + String methodName = idx == -1 ? name : name.substring(idx + 1); + if (perModelMetricsEnabled) { + modelId = Strings.nullToEmpty(modelId); + vModelId = Strings.nullToEmpty(vModelId); + } + if (perModelMetricsEnabled) { + timingHisto.labels(methodName, code.name(), modelId, vModelId).observe(elapsedMillis); + } else { + timingHisto.labels(methodName, code.name()).observe(elapsedMillis); + } if (reqPayloadSize != -1) { - ((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE)) - .labels(methodName, code.name()).observe(reqPayloadSize); + Histogram reqPayloadHisto = (Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE); + if (perModelMetricsEnabled) { + reqPayloadHisto.labels(methodName, code.name(), modelId, vModelId).observe(reqPayloadSize); + } else { + reqPayloadHisto.labels(methodName, code.name()).observe(reqPayloadSize); + } } if (respPayloadSize != -1) { - ((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE)) - .labels(methodName, code.name()).observe(respPayloadSize); + Histogram respPayloadHisto = (Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE); + if (perModelMetricsEnabled) { + respPayloadHisto.labels(methodName, code.name(), modelId, vModelId).observe(respPayloadSize); + } else { + respPayloadHisto.labels(methodName, code.name()).observe(respPayloadSize); + } } } @@ -437,6 +488,11 @@ protected StatsDSender createSender(Callable addressLookup, int q + (shortNames ? "short" : "fully-qualified") + " method names"); } + @Override + public boolean isPerModelMetricsEnabled() { + return false; + } + @Override public boolean isEnabled() { return true; @@ -454,12 +510,12 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) { } @Override - public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) { + public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId) { client.recordExecutionTime(name(metric), isNano ? elapsed / M : elapsed); } @Override - public void logSizeEventMetric(Metric metric, long value) { + public void logSizeEventMetric(Metric metric, long value, String modelId) { if (!legacy) { value *= metric.newMultiplier; } @@ -497,7 +553,7 @@ static String[] getOkTags(String method, boolean shortName) { @Override public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code, - int reqPayloadSize, int respPayloadSize) { + int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) { final StatsDClient client = this.client; final long elapsedMillis = elapsedNanos / M; final String countName = name(external ? API_REQUEST_COUNT : INVOKE_MODEL_COUNT); diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java index 9755df49..78c776b4 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java @@ -1966,7 +1966,7 @@ final synchronized boolean doRemove(final boolean evicted, // "unload" event if explicit unloading isn't enabled. // Otherwise, this gets recorded in a callback set in the // CacheEntry.unload(int) method - metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, 0L, false); + metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, 0L, false, modelId); metrics.logCounterMetric(Metric.UNLOAD_MODEL); } } @@ -2037,7 +2037,7 @@ public void onSuccess(Boolean reallyHappened) { //TODO probably only log if took longer than a certain time long tookMillis = msSince(beforeNanos); logger.info("Unload of " + modelId + " completed in " + tookMillis + "ms"); - metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, tookMillis, false); + metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, tookMillis, false, modelId); metrics.logCounterMetric(Metric.UNLOAD_MODEL); } // else considered trivially succeeded because the corresponding @@ -2158,7 +2158,7 @@ public final void run() { long queueStartTimeNanos = getAndResetLoadingQueueStartTimeNanos(); if (queueStartTimeNanos > 0) { long queueDelayMillis = (nanoTime() - queueStartTimeNanos) / M; - metrics.logSizeEventMetric(Metric.LOAD_MODEL_QUEUE_DELAY, queueDelayMillis); + metrics.logSizeEventMetric(Metric.LOAD_MODEL_QUEUE_DELAY, queueDelayMillis, modelId); // Only log if the priority value is "in the future" which indicates // that there is or were runtime requests waiting for this load. // Otherwise we don't care about arbitrary delays here @@ -2228,7 +2228,7 @@ public final void run() { loadingTimeStats(modelType).recordTime(tookMillis); logger.info("Load of model " + modelId + " type=" + modelType + " completed in " + tookMillis + "ms"); - metrics.logTimingMetricDuration(Metric.LOAD_MODEL_TIME, tookMillis, false); + metrics.logTimingMetricDuration(Metric.LOAD_MODEL_TIME, tookMillis, false, modelId); metrics.logCounterMetric(Metric.LOAD_MODEL); } catch (Throwable t) { loadFuture = null; @@ -2388,7 +2388,7 @@ protected final void complete(LoadedRuntime result, Throwable error) { if (size > 0) { long sizeBytes = size * UNIT_SIZE; logger.info("Model " + modelId + " size = " + size + " units" + ", ~" + mb(sizeBytes)); - metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes); + metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes, modelId); } else { try { long before = nanoTime(); @@ -2397,9 +2397,9 @@ protected final void complete(LoadedRuntime result, Throwable error) { long took = msSince(before), sizeBytes = size * UNIT_SIZE; logger.info("Model " + modelId + " size = " + size + " units" + ", ~" + mb(sizeBytes) + " sizing took " + took + "ms"); - metrics.logTimingMetricDuration(Metric.MODEL_SIZING_TIME, took, false); + metrics.logTimingMetricDuration(Metric.MODEL_SIZING_TIME, took, false, modelId); // this is actually a size (bytes), not a "time" - metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes); + metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes, modelId); } } catch (Exception e) { if (!isInterruption(e) && state == SIZING) { @@ -2722,7 +2722,7 @@ protected void beforeInvoke(int requestWeight) //noinspection ThrowFromFinallyBlock throw new ModelNotHereException(instanceId, modelId); } - metrics.logTimingMetricDuration(Metric.QUEUE_DELAY, tookMillis, false); + metrics.logTimingMetricDuration(Metric.QUEUE_DELAY, tookMillis, false, modelId); } } } @@ -2901,7 +2901,7 @@ public void onEviction(String key, CacheEntry ce, long lastUsed) { logger.info("Evicted " + (failed ? "failed model record" : "model") + " " + key + " from local cache, last used " + readableTime(millisSinceLastUsed) + " ago (" + lastUsed + "ms), invoked " + ce.getTotalInvocationCount() + " times"); - metrics.logTimingMetricDuration(Metric.AGE_AT_EVICTION, millisSinceLastUsed, false); + metrics.logTimingMetricDuration(Metric.AGE_AT_EVICTION, millisSinceLastUsed, false, ce.modelId); metrics.logCounterMetric(Metric.EVICT_MODEL); } @@ -3315,6 +3315,7 @@ protected Map getMap(Object[] arr) { static final String KNOWN_SIZE_CXT_KEY = "tas.known_size"; static final String UNBALANCED_KEY = "mmesh.unbalanced"; static final String DEST_INST_ID_KEY = "tas.dest_iid"; + static final String VMODEL_ID = "vmodelid"; // these are the possible values for the tas.internal context parameter // it won't be set on requests from outside of the cluster, and will @@ -3430,6 +3431,7 @@ protected Object invokeModel(final String modelId, final Method method, final Me } final String tasInternal = contextMap.get(TAS_INTERNAL_CXT_KEY); + final String vModelId = contextMap.getOrDefault(VMODEL_ID, ""); // Set the external request flag if it's not a tasInternal call or if // tasInternal == INTERNAL_REQ. The latter is a new ensureLoaded // invocation originating from within the cluster. @@ -3502,7 +3504,7 @@ protected Object invokeModel(final String modelId, final Method method, final Me throw new ModelNotHereException(instanceId, modelId); } try { - return invokeLocalModel(ce, method, args, modelId); + return invokeLocalModel(ce, method, args, vModelId); } catch (ModelLoadException mle) { mr = registry.get(modelId); if (mr == null || !mr.loadFailedInInstance(instanceId)) { @@ -3716,7 +3718,7 @@ protected Object invokeModel(final String modelId, final Method method, final Me localInvokesInFlight.incrementAndGet(); } try { - Object result = invokeLocalModel(cacheEntry, method, args, modelId); + Object result = invokeLocalModel(cacheEntry, method, args, vModelId); return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result; } finally { if (!favourSelfForHits) { @@ -3936,7 +3938,7 @@ else if (mr.getInstanceIds().containsKey(instanceId)) { // invoke model try { - Object result = invokeLocalModel(cacheEntry, method, args, modelId); + Object result = invokeLocalModel(cacheEntry, method, args, vModelId); return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result; } catch (ModelNotHereException e) { if (loadTargetFilter != null) loadTargetFilter.remove(instanceId); @@ -3991,7 +3993,7 @@ else if (mr.getInstanceIds().containsKey(instanceId)) { if (methodStartNanos > 0L && metrics.isEnabled()) { // only logged here in non-grpc (legacy) mode metrics.logRequestMetrics(true, getRequestMethodName(method, args), - nanoTime() - methodStartNanos, metricStatusCode, -1, -1); + nanoTime() - methodStartNanos, metricStatusCode, -1, -1, modelId, vModelId); } curThread.setName(threadNameBefore); } @@ -4403,17 +4405,17 @@ protected Object invokeRemoteModel(BaseModelMeshService.Iface client, Method met return remoteMeth.invoke(client, ObjectArrays.concat(modelId, args)); } - protected Object invokeLocalModel(CacheEntry ce, Method method, Object[] args, String modelId) + protected Object invokeLocalModel(CacheEntry ce, Method method, Object[] args, String vModelId) throws InterruptedException, TException { - Object result = invokeLocalModel(ce, method, args); + final Object result = _invokeLocalModel(ce, method, args, vModelId); // if this is an ensure-loaded request, check-for and trigger a "chained" load if necessary if (method == null) { - triggerChainedLoadIfNecessary(modelId, result, args, ce.getWeight(), null); + triggerChainedLoadIfNecessary(ce.modelId, result, args, ce.getWeight(), null); } return result; } - private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) + private Object _invokeLocalModel(CacheEntry ce, Method method, Object[] args, String vModelId) throws InterruptedException, TException { if (method == null) { @@ -4450,7 +4452,7 @@ private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) long delayMillis = msSince(beforeNanos); logger.info("Cache miss for model invocation, held up " + delayMillis + "ms"); metrics.logCounterMetric(Metric.CACHE_MISS); - metrics.logTimingMetricDuration(Metric.CACHE_MISS_DELAY, delayMillis, false); + metrics.logTimingMetricDuration(Metric.CACHE_MISS_DELAY, delayMillis, false, ce.modelId); } } } else { @@ -4528,7 +4530,7 @@ private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) ce.afterInvoke(weight, tookNanos); if (code != null && metrics.isEnabled()) { metrics.logRequestMetrics(false, getRequestMethodName(method, args), - tookNanos, code, -1, -1); + tookNanos, code, -1, -1, ce.modelId, vModelId); } } } diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index ff143ac6..715c0efe 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -30,6 +30,7 @@ import com.ibm.watson.litelinks.server.ReleaseAfterResponse; import com.ibm.watson.litelinks.server.ServerRequestThread; import com.ibm.watson.modelmesh.DataplaneApiConfig.RpcConfig; +import com.ibm.watson.modelmesh.GrpcSupport.InterruptingListener; import com.ibm.watson.modelmesh.ModelMesh.ExtendedStatusInfo; import com.ibm.watson.modelmesh.api.DeleteVModelRequest; import com.ibm.watson.modelmesh.api.DeleteVModelResponse; @@ -68,6 +69,7 @@ import io.grpc.ServerInterceptors; import io.grpc.ServerMethodDefinition; import io.grpc.ServerServiceDefinition; +import io.grpc.Status.Code; import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import io.grpc.netty.GrpcSslContexts; @@ -85,6 +87,7 @@ import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.SslContext; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -344,6 +347,10 @@ protected static void setUnbalancedLitelinksContextParam() { ThreadContext.addContextEntry(ModelMesh.UNBALANCED_KEY, "true"); // unbalanced } + protected static void setVModelIdLiteLinksContextParam(String vModelId) { + ThreadContext.addContextEntry(ModelMesh.VMODEL_ID, vModelId); + } + // ----------------- concrete model management methods @Override @@ -427,6 +434,9 @@ public void ensureLoaded(EnsureLoadedRequest request, StreamObserver resolvedModelId = new FastThreadLocal<>(); + // Returned ModelResponse will be released once the request thread exits so // must be retained before transferring. // non-private to avoid synthetic method access @@ -441,9 +451,13 @@ ModelResponse callModel(String modelId, boolean isVModel, String methodName, Str } String vModelId = modelId; modelId = null; + if (delegate.metrics.isPerModelMetricsEnabled()) { + setVModelIdLiteLinksContextParam(vModelId); + } boolean first = true; while (true) { modelId = vmm().resolveVModelId(vModelId, modelId); + resolvedModelId.set(modelId); if (unbalanced) { setUnbalancedLitelinksContextParam(); } @@ -542,7 +556,7 @@ protected static void respondAndComplete(StreamObserver response, } protected static io.grpc.Status toStatus(Exception e) { - io.grpc.Status s = null; + io.grpc.Status s; String msg = e.getMessage(); if (e instanceof ModelNotFoundException) { return MODEL_NOT_FOUND_STATUS; @@ -655,7 +669,7 @@ public Listener startCall(ServerCall call, Metadata h call.request(2); // request 2 to force failure if streaming method - return new Listener() { + return new Listener<>() { ByteBuf reqMessage; boolean canInvoke = true; Iterable modelIds = mids.modelIds; @@ -707,7 +721,8 @@ public void onHalfClose() { int respReaderIndex = 0; io.grpc.Status status = INTERNAL; - String modelId = null; + String resolvedModelId = null; + String vModelId = null; String requestId = null; ModelResponse response = null; try (InterruptingListener cancelListener = newInterruptingListener()) { @@ -721,16 +736,28 @@ public void onHalfClose() { String balancedMetaVal = headers.get(BALANCED_META_KEY); Iterator midIt = modelIds.iterator(); // guaranteed at least one - modelId = validateModelId(midIt.next(), isVModel); + String modelOrVModelId = validateModelId(midIt.next(), isVModel); if (!midIt.hasNext()) { // single model case (most common) - response = callModel(modelId, isVModel, methodName, - balancedMetaVal, headers, reqMessage).retain(); + if (isVModel) { + ModelMeshApi.resolvedModelId.set(null); + } + try { + response = callModel(modelOrVModelId, isVModel, methodName, + balancedMetaVal, headers, reqMessage).retain(); + } finally { + if (isVModel) { + vModelId = modelOrVModelId; + resolvedModelId = ModelMeshApi.resolvedModelId.getIfExists(); + } else { + resolvedModelId = modelOrVModelId; + } + } } else { // multi-model case (specialized) boolean allRequired = "all".equalsIgnoreCase(headers.get(REQUIRED_KEY)); List idList = new ArrayList<>(); - idList.add(modelId); + idList.add(modelOrVModelId); while (midIt.hasNext()) { idList.add(validateModelId(midIt.next(), isVModel)); } @@ -740,7 +767,7 @@ public void onHalfClose() { } finally { if (payloadProcessor != null) { processPayload(reqMessage.readerIndex(reqReaderIndex), - requestId, modelId, methodName, headers, null, true); + requestId, resolvedModelId, methodName, headers, null, true); } else { releaseReqMessage(); } @@ -768,6 +795,7 @@ public void onHalfClose() { evictMethodDescriptor(methodName); } } finally { + final boolean releaseResponse = status != OK; if (payloadProcessor != null) { ByteBuf data = null; Metadata metadata = null; @@ -775,7 +803,9 @@ public void onHalfClose() { data = response.data.readerIndex(respReaderIndex); metadata = response.metadata; } - processPayload(data, requestId, modelId, methodName, metadata, status, false); + processPayload(data, requestId, resolvedModelId, methodName, metadata, status, releaseResponse); + } else if (releaseResponse && response != null) { + response.release(); } ReleaseAfterResponse.releaseAll(); clearThreadLocals(); @@ -784,7 +814,7 @@ public void onHalfClose() { Metrics metrics = delegate.metrics; if (metrics.isEnabled()) { metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize); + status.getCode(), reqSize, respSize, resolvedModelId, vModelId); } } } @@ -805,7 +835,7 @@ private void processPayload(ByteBuf data, String payloadId, String modelId, Stri try { assert payloadProcessor != null; if (!takeOwnership) { - data.retain(); + ReferenceCountUtil.retain(data); } payload = new Payload(payloadId, modelId, methodName, metadata, data, status); if (payloadProcessor.process(payload)) { diff --git a/src/main/java/com/ibm/watson/modelmesh/VModelManager.java b/src/main/java/com/ibm/watson/modelmesh/VModelManager.java index d2706a16..7ad5da8a 100644 --- a/src/main/java/com/ibm/watson/modelmesh/VModelManager.java +++ b/src/main/java/com/ibm/watson/modelmesh/VModelManager.java @@ -27,7 +27,6 @@ import com.ibm.watson.kvutils.KVTable.Helper.TableTxn; import com.ibm.watson.kvutils.KVTable.TableView; import com.ibm.watson.kvutils.factory.KVUtilsFactory; -import com.ibm.watson.litelinks.ThreadContext; import com.ibm.watson.litelinks.ThreadPoolHelper; import com.ibm.watson.modelmesh.GrpcSupport.InterruptingListener; import com.ibm.watson.modelmesh.api.ModelInfo; diff --git a/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java b/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java index 8004e0a7..401fba2d 100644 --- a/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java +++ b/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java @@ -21,8 +21,11 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.Metadata; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.base64.Base64; import org.slf4j.Logger; @@ -58,16 +61,32 @@ private static PayloadContent prepareContentBody(Payload payload) { ByteBuf byteBuf = payload.getData(); String data; if (byteBuf != null) { - ByteBuf encoded = Base64.encode(byteBuf, byteBuf.readerIndex(), byteBuf.readableBytes(), false); - //TODO custom jackson serialization for this field to avoid round-tripping to string - data = encoded.toString(StandardCharsets.US_ASCII); + data = encodeBinaryToString(byteBuf); } else { data = ""; } + Metadata metadata = payload.getMetadata(); + Map metadataMap = new HashMap<>(); + if (metadata != null) { + for (String key : metadata.keys()) { + if (key.endsWith("-bin")) { + byte[] bytes = metadata.get(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER)); + metadataMap.put(key, java.util.Base64.getEncoder().encodeToString(bytes)); + } else { + String value = metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)); + metadataMap.put(key, value); + } + } + } String status = payload.getStatus() != null ? payload.getStatus().getCode().toString() : ""; - return new PayloadContent(id, modelId, data, kind, status); + return new PayloadContent(id, modelId, data, kind, status, metadataMap); } + private static String encodeBinaryToString(ByteBuf byteBuf) { + ByteBuf encodedBinary = Base64.encode(byteBuf, byteBuf.readerIndex(), byteBuf.readableBytes(), false); + //TODO custom jackson serialization for this field to avoid round-tripping to string + return encodedBinary.toString(StandardCharsets.US_ASCII); + } private boolean sendPayload(Payload payload) { try { @@ -94,18 +113,22 @@ public String getName() { } private static class PayloadContent { + private final String id; private final String modelid; private final String data; private final String kind; private final String status; + private final Map metadata; - private PayloadContent(String id, String modelid, String data, String kind, String status) { + private PayloadContent(String id, String modelid, String data, String kind, String status, + Map metadata) { this.id = id; this.modelid = modelid; this.data = data; this.kind = kind; this.status = status; + this.metadata = metadata; } public String getId() { @@ -128,6 +151,10 @@ public String getStatus() { return status; } + public Map getMetadata() { + return metadata; + } + @Override public String toString() { return "PayloadContent{" + @@ -136,6 +163,7 @@ public String toString() { ", data='" + data + '\'' + ", kind='" + kind + '\'' + ", status='" + status + '\'' + + ", metadata='" + metadata + '\'' + '}'; } } diff --git a/src/main/java/com/ibm/watson/prometheus/SimpleCollector.java b/src/main/java/com/ibm/watson/prometheus/SimpleCollector.java index ffca070b..c7b25c1f 100644 --- a/src/main/java/com/ibm/watson/prometheus/SimpleCollector.java +++ b/src/main/java/com/ibm/watson/prometheus/SimpleCollector.java @@ -161,7 +161,7 @@ private static int nextIdx(int i, int len) { private void validateCount(int count) { if (count != labelCount) { - throw new IllegalArgumentException("Incorrect number of labels."); + throw new IllegalArgumentException("Incorrect number of labels. Expected: " + labelCount + ", got: " + count); } } diff --git a/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java b/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java index dc6ee35e..a78cef1c 100644 --- a/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java @@ -32,6 +32,7 @@ import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import javax.net.ssl.SSLContext; @@ -76,10 +77,11 @@ protected int requestCount() { @Override protected Map extraEnvVars() { - return ImmutableMap.of("MM_METRICS", "prometheus:port=" + METRICS_PORT + ";scheme=" + SCHEME); + return ImmutableMap.of("MM_METRICS", "prometheus:port=" + METRICS_PORT + ";scheme=" + SCHEME + + ";per_model_metrics=true"); } - @Test + @BeforeAll public void metricsTest() throws Exception { ManagedChannel channel = NettyChannelBuilder.forAddress("localhost", 9000).usePlaintext().build(); @@ -151,11 +153,11 @@ public void metricsTest() throws Exception { } } - public void verifyMetrics() throws Exception { + protected Map prepareMetrics() throws Exception { // Insecure trust manager - skip TLS verification SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, InsecureTrustManagerFactory.INSTANCE.getTrustManagers(), null); - + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); HttpRequest metricsRequest = HttpRequest.newBuilder() .uri(URI.create(SCHEME + "://localhost:" + METRICS_PORT + "/metrics")).build(); @@ -172,29 +174,35 @@ public void verifyMetrics() throws Exception { .filter(Matcher::matches) .collect(Collectors.toMap(m -> m.group(1), m -> Double.parseDouble(m.group(2)))); + return metrics; + } + + @Test + public void verifyMetrics() throws Exception { + // Insecure trust manager - skip TLS verification + Map metrics = prepareMetrics(); + System.out.println(metrics.size() + " metrics scraped"); // Spot check some expected metrics and values // External response time should all be < 2000ms (includes cache hit loading time) - assertEquals(40.0, metrics.get("modelmesh_api_request_milliseconds_bucket{method=\"predict\",code=\"OK\",le=\"2000.0\",}")); + assertEquals(40.0, metrics.get("modelmesh_api_request_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"2000.0\",}")); // External response time should all be < 200ms (includes cache hit loading time) - assertEquals(40.0, metrics.get("modelmesh_invoke_model_milliseconds_bucket{method=\"predict\",code=\"OK\",le=\"200.0\",}")); + assertEquals(40.0, + metrics.get("modelmesh_invoke_model_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"120000.0\",}")); // Simulated model sizing time is < 200ms - assertEquals(1.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{le=\"200.0\",}")); + assertEquals(1.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vModelId=\"\",le=\"60000.0\",}")); // Simulated model sizing time is > 50ms - assertEquals(0.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{le=\"50.0\",}")); + assertEquals(0.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vModelId=\"\",le=\"50.0\",}")); // Simulated model size is between 64MiB and 256MiB - assertEquals(0.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{le=\"6.7108864E7\",}")); - assertEquals(1.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{le=\"2.68435456E8\",}")); + assertEquals(0.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vModelId=\"\",le=\"6.7108864E7\",}")); + assertEquals(1.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vModelId=\"\",le=\"2.68435456E8\",}")); // One model is loaded - assertEquals(1.0, metrics.get("modelmesh_models_loaded_total")); assertEquals(1.0, metrics.get("modelmesh_instance_models_total")); // Histogram counts should reflect the two payload sizes (30 small, 10 large) - assertEquals(30.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",le=\"128.0\",}")); - assertEquals(40.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",le=\"2097152.0\",}")); - assertEquals(30.0, metrics.get("modelmesh_response_size_bytes_bucket{method=\"predict\",code=\"OK\",le=\"128.0\",}")); - assertEquals(40.0, metrics.get("modelmesh_response_size_bytes_bucket{method=\"predict\",code=\"OK\",le=\"2097152.0\",}")); + assertEquals(30.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"128.0\",}")); + assertEquals(40.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"2097152.0\",}")); // Memory metrics assertTrue(metrics.containsKey("netty_pool_mem_allocated_bytes{area=\"direct\",}")); diff --git a/src/test/java/com/ibm/watson/modelmesh/SidecarModelMeshPayloadProcessingTest.java b/src/test/java/com/ibm/watson/modelmesh/SidecarModelMeshPayloadProcessingTest.java index a74ef778..56d728a0 100644 --- a/src/test/java/com/ibm/watson/modelmesh/SidecarModelMeshPayloadProcessingTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/SidecarModelMeshPayloadProcessingTest.java @@ -42,7 +42,7 @@ public class SidecarModelMeshPayloadProcessingTest extends SingleInstanceModelMe @BeforeEach public void initialize() throws Exception { - System.setProperty(ModelMeshEnvVars.MM_PAYLOAD_PROCESSORS, "logger://*"); + System.setProperty(ModelMeshEnvVars.MM_PAYLOAD_PROCESSORS, "http://localhost:8080/consumer/kserve/v2"); super.initialize(); } diff --git a/src/test/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessorTest.java b/src/test/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessorTest.java index 7a75a60c..ec08ea0a 100644 --- a/src/test/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessorTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessorTest.java @@ -36,6 +36,8 @@ void testDestinationUnreachable() { String method = "predict"; Status kind = Status.INVALID_ARGUMENT; Metadata metadata = new Metadata(); + metadata.put(Metadata.Key.of("foo", Metadata.ASCII_STRING_MARSHALLER), "bar"); + metadata.put(Metadata.Key.of("binary-bin", Metadata.BINARY_BYTE_MARSHALLER), "string".getBytes()); ByteBuf data = Unpooled.buffer(4); Payload payload = new Payload(id, modelId, method, metadata, data, kind); assertFalse(remotePayloadProcessor.process(payload)); diff --git a/version b/version index c79b65e4..13d4c7d4 100644 --- a/version +++ b/version @@ -1,5 +1,4 @@ # Version information -upstream kserve modelmesh version: v0.11.0-alpha -upstream modelmesh version: v0.11.0-alpha -opendatahub version: 1.4.2 -opendatahub modelmesh branch: release-v0.11.0-alpha +upstream kserve modelmesh version: v0.11.0 +opendatahub version: latest +opendatahub modelmesh branch: release-0.11.0