From 381a683ebe4742fb26cfd70a7906b125e9a1cffe Mon Sep 17 00:00:00 2001 From: "jian.h" Date: Sat, 19 Oct 2019 22:10:03 -0700 Subject: [PATCH 1/9] Support Samza on Kubernetes. Contributed by Weiqing Yang &Jian He --- build.gradle | 21 ++ gradle/dependency-versions.gradle | 1 + gradlew.bat | 84 +++++ .../ContainerProcessManager.java | 32 +- .../samza/coordinator/JobModelManager.scala | 30 +- .../samza/coordinator/server/HttpServer.scala | 10 + .../org/apache/samza/util/HttpUtil.scala | 1 + .../src/docker/dockerfiles/Dockerfile | 34 ++ .../org/apache/samza/config/KubeConfig.java | 57 ++++ .../job/kubernetes/KubeClientFactory.java | 40 +++ .../KubeClusterResourceManager.java | 323 ++++++++++++++++++ .../apache/samza/job/kubernetes/KubeJob.java | 242 +++++++++++++ .../samza/job/kubernetes/KubeJobFactory.java | 32 ++ .../job/kubernetes/KubePodStatusWatcher.java | 172 ++++++++++ .../KubeResourceManagerFactory.java | 40 +++ .../samza/job/kubernetes/KubeUtils.java | 56 +++ settings.gradle | 1 + 17 files changed, 1147 insertions(+), 29 deletions(-) create mode 100644 gradlew.bat create mode 100644 samza-kubernetes/src/docker/dockerfiles/Dockerfile create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java diff --git a/build.gradle b/build.gradle index 845b3160d0..00b40f4065 100644 --- a/build.gradle +++ b/build.gradle @@ -568,6 +568,27 @@ project(":samza-yarn_$scalaSuffix") { jar.dependsOn("lesscss") } +project(":samza-kubernetes_$scalaSuffix") { + apply plugin: 'java' + + dependencies { + compile project(':samza-api') + compile project(":samza-core_$scalaSuffix") + compile "org.codehaus.jackson:jackson-core-asl:1.9.7" + compile group: 'io.fabric8', name: 'kubernetes-client', version: kubernetesJavaClientVersion + testCompile "junit:junit:$junitVersion" + testCompile "org.mockito:mockito-core:$mockitoVersion" + } + + tasks.create(name: "releaseKubeTarGz", dependsOn: configurations.archives.artifacts, type: Tar) { + into "samza-kubernetes-${version}" + compression = Compression.GZIP + from(configurations.runtime) { into("lib/") } + from(configurations.archives.artifacts.files) { into("lib/") } + duplicatesStrategy 'exclude' + } +} + project(":samza-shell") { apply plugin: 'java' diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index f70e879d37..c3b857d980 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -53,4 +53,5 @@ jnaVersion = "4.5.1" couchbaseClientVersion = "2.7.2" couchbaseMockVersion = "1.5.22" + kubernetesJavaClientVersion = "4.1.3" } diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000000..e95643d6a2 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index cb0e5374cd..edf0f04c20 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -93,7 +93,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback * The Allocator matches requests to resources and executes processes. */ private final ContainerAllocator containerAllocator; - private final Thread allocatorThread; + private Thread allocatorThread = null; // The StandbyContainerManager manages standby-aware allocation and failover of containers private final Optional standbyContainerManager; @@ -166,8 +166,10 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri } this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.standbyContainerManager); - this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); - LOG.info("Finished container process manager initialization."); + if (shouldStartAllocateThread()) { + this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); + } + LOG.info("finished initialization of samza task manager"); } @VisibleForTesting @@ -184,14 +186,23 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri this.clusterResourceManager = resourceManager; this.standbyContainerManager = Optional.empty(); + this.diagnosticsManager = Option.empty(); this.containerAllocator = allocator.orElseGet( () -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state, hostAffinityEnabled, this.standbyContainerManager)); - this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); + if (shouldStartAllocateThread()) { + this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); + } LOG.info("Finished container process manager initialization"); } + // In Kubernetes, the pod will be started by kubelet automatically once it is allocated, it does not need a + // separate thread to keep polling the allocated resources to start the container. + public boolean shouldStartAllocateThread() { + return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager"); + } + public boolean shouldShutdown() { LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}, Is allocator thread alive: {}", state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no"); @@ -237,7 +248,9 @@ public void start() { // Start container allocator thread LOG.info("Starting the container allocator thread"); - allocatorThread.start(); + if (allocatorThread != null) { + allocatorThread.start(); + } LOG.info("Starting the container process manager"); } @@ -246,12 +259,15 @@ public void stop() { // Shutdown allocator thread containerAllocator.stop(); - try { + if (allocatorThread != null) { + + try { allocatorThread.join(); LOG.info("Stopped container allocator"); } catch (InterruptedException ie) { - LOG.error("Allocator thread join threw an interrupted exception", ie); - Thread.currentThread().interrupt(); + LOG.error("Allocator thread join threw an interrupted exception", ie); + Thread.currentThread().interrupt(); + } } if (diagnosticsManager.isDefined()) { diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 2afc36bd86..2f6da12213 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -23,34 +23,22 @@ import java.util import java.util.concurrent.atomic.AtomicReference import org.apache.samza.{Partition, SamzaException} -import org.apache.samza.config._ -import org.apache.samza.config.Config -import org.apache.samza.container.grouper.stream.SSPGrouperProxy -import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory +import org.apache.samza.config.{Config, _} +import org.apache.samza.container.grouper.stream.{SSPGrouperProxy, SystemStreamPartitionGrouperFactory} import org.apache.samza.container.grouper.task._ -import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore -import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping -import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping -import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping -import org.apache.samza.container.LocalityManager -import org.apache.samza.container.TaskName -import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore -import org.apache.samza.coordinator.server.HttpServer -import org.apache.samza.coordinator.server.JobServlet -import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping -import org.apache.samza.job.model.ContainerModel -import org.apache.samza.job.model.JobModel -import org.apache.samza.job.model.TaskMode -import org.apache.samza.job.model.TaskModel -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.container.{LocalityManager, TaskName} +import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore} +import org.apache.samza.coordinator.server.{HttpServer, JobServlet} +import org.apache.samza.coordinator.stream.messages.{SetContainerHostMapping, SetTaskContainerMapping, SetTaskModeMapping, SetTaskPartitionMapping} +import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, TaskModel} +import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap} import org.apache.samza.runtime.LocationId import org.apache.samza.system._ import org.apache.samza.util.ScalaJavaUtil.JavaOptionals import org.apache.samza.util.{Logging, ReflectionUtil, Util} -import scala.collection.JavaConverters import scala.collection.JavaConversions._ +import scala.collection.JavaConverters import scala.collection.JavaConverters._ /** diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala index 384972262e..055fae0928 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala @@ -136,4 +136,14 @@ class HttpServer( throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.") } } + + def getIpUrl = { + if (running) { + val runningPort = server.getConnectors()(0).asInstanceOf[NetworkConnector].getLocalPort() + + new URL("http://" + Util.getLocalHost.getHostAddress + ":" + runningPort + rootPath) + } else { + throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.") + } + } } diff --git a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala index ea5eb5a8cc..752322f9e6 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala @@ -55,6 +55,7 @@ object HttpUtil { (exception, loop) => { exception match { case ioe: IOException => { + error(ioe) warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass) httpConn = getHttpConnection(url, timeout) } diff --git a/samza-kubernetes/src/docker/dockerfiles/Dockerfile b/samza-kubernetes/src/docker/dockerfiles/Dockerfile new file mode 100644 index 0000000000..98b1229a51 --- /dev/null +++ b/samza-kubernetes/src/docker/dockerfiles/Dockerfile @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# samzaJarsFolder includes all the Samza jars (you needs to make sure all the samza jars are there.) +# You can build Samza image by: +# docker build -t dockerHubAccount/samza:versionNumber . +# Then Samza user can use the Samza image as base image to build their application image. +# + +FROM ubuntu:latest + +RUN apt-get update -y && apt-get upgrade -y && apt-get install -y openjdk-8-jdk + +ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64 +ENV PATH $PATH:$JAVA_HOME/bin + +RUN mkdir -p /opt/samza +WORKDIR /opt/samza/ +COPY samzaJarsFolder/ /opt/samza/ diff --git a/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java b/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java new file mode 100644 index 0000000000..c339577021 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +public class KubeConfig { + + // the image name of samza + public static final String APP_IMAGE = "kube.app.image"; + public static final String DEFAULT_IMAGE = "weiqingyang/samza:v0"; + + // The directory path inside which the log will be stored. + public static final String SAMZA_MOUNT_DIR = "kube.app.pod.mnt.path"; + public static final String K8S_API_NAMESPACE = "kube.app.namespace"; + public static final String STREAM_PROCESSOR_CONTAINER_NAME_PREFIX = "sp"; + public static final String JC_CONTAINER_NAME_PREFIX = "jc"; + public static final String POD_RESTART_POLICY = "OnFailure"; + public static final String JC_POD_NAME_FORMAT = "%s-%s-%s"; // jc-appName-appId + public static final String TASK_POD_NAME_FORMAT = "%s-%s-%s-%s"; // sp-appName-appId-containerId + + // Environment variable + public static final String COORDINATOR_POD_NAME = "COORDINATOR_POD_NAME"; + public static final String AZURE_REMOTE_VOLUME_ENABLED = "kube.app.volume.azure.file-share.enabled"; + public static final String AZURE_SECRET = "kube.app.volume.azure-secret"; + public static final String AZURE_FILESHARE = "kube.app.volume.azure.file-share"; + + private Config config; + public KubeConfig(Config config) { + this.config = config; + } + + public static KubeConfig validate(Config config) throws ConfigException { + KubeConfig kc = new KubeConfig(config); + kc.validate(); + return kc; + } + + private void validate() throws ConfigException { + // TODO + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java new file mode 100644 index 0000000000..4405a0969d --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class KubeClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(KubeClientFactory.class); + + public static KubernetesClient create() { + ConfigBuilder builder = new ConfigBuilder(); + Config config = builder.build(); + KubernetesClient client = new DefaultKubernetesClient(config); + LOG.info("Kubernetes client created. "); + return client; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java new file mode 100644 index 0000000000..d341eceda0 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.*; +import org.apache.samza.SamzaException; +import org.apache.samza.clustermanager.*; +import org.apache.samza.config.ClusterManagerConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; +import org.apache.samza.job.CommandBuilder; +import org.apache.samza.job.ShellCommandBuilder; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.samza.config.ApplicationConfig.*; +import static org.apache.samza.config.KubeConfig.*; + +public class KubeClusterResourceManager extends ClusterResourceManager { + private static final Logger LOG = LoggerFactory.getLogger(KubeClusterResourceManager.class); + private final Map podLabels = new HashMap<>(); + private KubernetesClient client; + private String appId; + private String appName; + private String image; + private String namespace; + private OwnerReference ownerReference; + private JobModelManager jobModelManager; + private boolean hostAffinityEnabled; + private Config config; + private String jcPodName; + + KubeClusterResourceManager(Config config, JobModelManager jobModelManager, ClusterResourceManager.Callback callback) { + super(callback); + this.config = config; + this.client = KubeClientFactory.create(); + this.jobModelManager = jobModelManager; + this.image = config.get(APP_IMAGE, DEFAULT_IMAGE); + this.namespace = config.get(K8S_API_NAMESPACE, "default"); + this.appId = config.get(APP_ID, "001"); + this.appName = config.get(APP_NAME, "samza"); + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled(); + createOwnerReferences(); + } + + @Override + public void start() { + LOG.info("Kubernetes Cluster ResourceManager started, starting watcher"); + startPodWatcher(); + jobModelManager.start(); + } + + // Create the owner reference for the samza-job-coordinator pod + private void createOwnerReferences() { + this.jcPodName = System.getenv(COORDINATOR_POD_NAME); + LOG.info("job coordinator pod name is: {}, namespace is: {}", jcPodName, namespace); + Pod pod = client.pods().inNamespace(namespace).withName(jcPodName).get(); + ownerReference = new OwnerReferenceBuilder() + .withName(pod.getMetadata().getName()) + .withApiVersion(pod.getApiVersion()) + .withUid(pod.getMetadata().getUid()) + .withKind(pod.getKind()) + .withController(true).build(); + podLabels.put("jc-pod-name", jcPodName); + } + + public void startPodWatcher() { + Watcher watcher = new Watcher() { + @Override + public void eventReceived(Action action, Pod pod) { + if (!pod.getMetadata().getLabels().get("jc-pod-name").equals(jcPodName)) { + LOG.warn("This JC pod name is " + jcPodName + ", received pods for a different JC " + + pod.getMetadata().getLabels().get("jc-pod-name")); + return; + } + LOG.info("Pod watcher received action " + action + " for pod " + pod.getMetadata().getName()); + switch (action) { + case ADDED: + LOG.info("Pod " + pod.getMetadata().getName() + " is added."); + break; + case MODIFIED: + LOG.info("Pod " + pod.getMetadata().getName() + " is modified."); + if (isPodFailed(pod)) { + deletePod(pod); + } + break; + case ERROR: + LOG.info("Pod " + pod.getMetadata().getName() + " received error."); + if (isPodFailed(pod)) { + deletePod(pod); + } + break; + case DELETED: + LOG.info("Pod " + pod.getMetadata().getName() + " is deleted."); + createNewStreamProcessor(pod); + break; + } + } + @Override + public void onClose(KubernetesClientException e) { + LOG.error("Pod watcher closed", e); + } + }; + + // TODO: "podLabels" is empty. Need to add lable when creating Pod + client.pods().withLabels(podLabels).watch(watcher); + } + + private boolean isPodFailed(Pod pod) { + return pod.getStatus() != null && pod.getStatus().getPhase().equals("Failed"); + } + + private void deletePod(Pod pod) { + boolean deleted = client.pods().delete(pod); + if (deleted) { + LOG.info("Deleted pod " + pod.getMetadata().getName()); + } else { + LOG.info("Failed to deleted pod " + pod.getMetadata().getName()); + } + } + private void createNewStreamProcessor(Pod pod) { + int memory = Integer.parseInt(pod.getSpec().getContainers().get(0).getResources().getRequests().get("memory").getAmount()); + int cpu = Integer.parseInt(pod.getSpec().getContainers().get(0).getResources().getRequests().get("cpu").getAmount()); + + String containerId = KubeUtils.getSamzaContainerNameFromPodName(pod.getMetadata().getName()); + + // Find out previously running container location + String lastSeenOn = jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY); + if (!hostAffinityEnabled || lastSeenOn == null) { + lastSeenOn = ResourceRequestState.ANY_HOST; + } + SamzaResourceRequest request = new SamzaResourceRequest(cpu, memory, lastSeenOn, containerId); + requestResources(request); + } + + @Override + public void requestResources(SamzaResourceRequest resourceRequest) { + String samzaContainerId = resourceRequest.getContainerID(); + LOG.info("Requesting resources on " + resourceRequest.getPreferredHost() + " for container " + samzaContainerId); + CommandBuilder builder = getCommandBuilder(samzaContainerId); + String command = buildCmd(builder); + LOG.info("Container ID {} using command {}", samzaContainerId, command); + Container container = KubeUtils.createContainer(STREAM_PROCESSOR_CONTAINER_NAME_PREFIX, image, resourceRequest, command); + container.setEnv(getEnvs(builder)); + String podName = String.format(TASK_POD_NAME_FORMAT, STREAM_PROCESSOR_CONTAINER_NAME_PREFIX, appName, appId, samzaContainerId); + + PodBuilder podBuilder; + if (config.getBoolean(AZURE_REMOTE_VOLUME_ENABLED)) { + AzureFileVolumeSource azureFileVolumeSource = + new AzureFileVolumeSource(false, config.get(AZURE_SECRET, "azure-secret"), config.get(AZURE_FILESHARE, "aksshare")); + Volume volume = new Volume(); + volume.setAzureFile(azureFileVolumeSource); + volume.setName("azure"); + VolumeMount volumeMount = new VolumeMount(); + volumeMount.setMountPath(config.get(SAMZA_MOUNT_DIR, "/tmp/mnt")); + volumeMount.setName("azure"); + volumeMount.setSubPath(podName); + LOG.info("Set subpath to " + podName + ", mountpath to " + config.get(SAMZA_MOUNT_DIR, "/tmp/mnt")); + container.setVolumeMounts(Collections.singletonList(volumeMount)); + podBuilder = new PodBuilder().editOrNewMetadata() + .withName(podName) + .withOwnerReferences(ownerReference) + .addToLabels(podLabels).endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .withVolumes(volume).addToContainers(container).endSpec(); + } else { + podBuilder = new PodBuilder().editOrNewMetadata() + .withName(podName) + .withOwnerReferences(ownerReference) + .addToLabels(podLabels).endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .addToContainers(container).endSpec(); + } + + String preferredHost = resourceRequest.getPreferredHost(); + Pod pod; + if (preferredHost.equals("ANY_HOST")) { + // Create a pod with only one container in anywhere + pod = podBuilder.build(); + } else { + LOG.info("Making a preferred host request on " + preferredHost); + pod = podBuilder.editOrNewSpec().editOrNewAffinity().editOrNewNodeAffinity() + .addNewPreferredDuringSchedulingIgnoredDuringExecution().withNewPreference() + .addNewMatchExpression() + .withKey("kubernetes.io/hostname") + .withOperator("Equal") + .withValues(preferredHost).endMatchExpression() + .endPreference().endPreferredDuringSchedulingIgnoredDuringExecution().endNodeAffinity().endAffinity().endSpec().build(); + } + client.pods().inNamespace(namespace).create(pod); + LOG.info("Created a pod " + pod.getMetadata().getName() + " on " + preferredHost); + } + + @Override + public void cancelResourceRequest(SamzaResourceRequest request) { + // no need to implement + } + + @Override + public void releaseResources(SamzaResource resource) { + // no need to implement + } + + @Override + public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) { + // no need to implement + } + + @Override + public void stopStreamProcessor(SamzaResource resource) { + client.pods().withName(resource.getResourceID()).delete(); + } + + @Override + public void stop(SamzaApplicationState.SamzaAppStatus status) { + LOG.info("Kubernetes Cluster ResourceManager stopped"); + jobModelManager.stop(); + // TODO: need to check + } + + private String buildCmd(CommandBuilder cmdBuilder) { + // TODO: check if we have framework path specified. If yes - use it, if not use default /opt/hello-samza/ + String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries + String cmdPath = "/opt/samza/"; // TODO + + String fwkPath = JobConfig.getFwkPath(config); + if(fwkPath != null && (! fwkPath.isEmpty())) { + cmdPath = fwkPath; + jobLib = "export JOB_LIB_DIR=/opt/samza/lib"; + } + LOG.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib); + + cmdBuilder.setCommandPath(cmdPath); + + return cmdBuilder.buildCommand(); + } + + // TODO: Need to check it again later!! Check AbstractContainerAllocator.getCommandBuilder(samzaContainerId) + private CommandBuilder getCommandBuilder(String containerId) { + TaskConfig taskConfig = new TaskConfig(config); + String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName()); + CommandBuilder cmdBuilder = Util.getObj(cmdBuilderClassName, CommandBuilder.class); + if (jobModelManager.server() == null) { + LOG.error("HttpServer is null"); + } + URL url = jobModelManager.server().getIpUrl(); + LOG.info("HttpServer URL: " + url); + cmdBuilder.setConfig(config).setId(containerId).setUrl(url); + + return cmdBuilder; + } + + // Construct the envs for the task container pod + private List getEnvs(CommandBuilder cmdBuilder) { + // for logging + StringBuilder sb = new StringBuilder(); + + List envList = new ArrayList<>(); + for (Map.Entry entry : cmdBuilder.buildEnvironment().entrySet()) { + envList.add(new EnvVar(entry.getKey(), entry.getValue(), null)); + sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue())); //logging + } + + // TODO: The ID assigned to the container by the execution environment: K8s container Id. ?? Seems there is no id + // envList.add(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), container.getId().toString()); + // sb.append(String.format("\n%s=%s", ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), container.getId().toString())); + + envList.add(new EnvVar("LOGGED_STORE_BASE_DIR", config.get(SAMZA_MOUNT_DIR), null)); + envList.add(new EnvVar("EXECUTION_PLAN_DIR", config.get(SAMZA_MOUNT_DIR), null)); + envList.add(new EnvVar("SAMZA_LOG_DIR", config.get(SAMZA_MOUNT_DIR), null)); + + LOG.info("Using environment variables: {}", cmdBuilder, sb.toString()); + + return envList; + } + + private URL formatUrl(URL url) { + int port = url.getPort(); + String host = url.getHost(); + LOG.info("Original host: {}, port: {}, url: {}", host, port, url); + + String formattedHost = host + "."+ namespace + ".svc.cluster.local"; + LOG.info("Formatted host: {}, port: {}", formattedHost, port); + URL newUrl; + try { + newUrl = new URL("http://" + formattedHost + ":" + url.getPort()); + LOG.info("Formatted URL: {}", newUrl); + } catch (MalformedURLException ex) { + throw new SamzaException(ex); + } + return newUrl; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java new file mode 100644 index 0000000000..9ede574248 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.samza.SamzaException; +import org.apache.samza.clustermanager.ResourceRequestState; +import org.apache.samza.clustermanager.SamzaResourceRequest; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.job.ApplicationStatus; +import org.apache.samza.job.StreamJob; +import org.apache.samza.util.CoordinatorStreamUtil; +import org.apache.samza.util.Util; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.mutable.StringBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.samza.config.ApplicationConfig.APP_ID; +import static org.apache.samza.config.ApplicationConfig.APP_NAME; +import static org.apache.samza.config.KubeConfig.*; +import static org.apache.samza.job.ApplicationStatus.*; +import static org.apache.samza.serializers.model.SamzaObjectMapper.getObjectMapper; + +/** + * The client to start a Kubernetes job coordinator + */ +public class KubeJob implements StreamJob { + private static final Logger LOG = LoggerFactory.getLogger(KubeJob.class); + private Config config; + private KubernetesClient kubernetesClient; + private String podName; + private ApplicationStatus currentStatus; + private String nameSpace; + private KubePodStatusWatcher watcher; + private String image; + + public KubeJob(Config config) { + this.kubernetesClient = KubeClientFactory.create(); + this.config = config; + this.podName = String.format(JC_POD_NAME_FORMAT, JC_CONTAINER_NAME_PREFIX, + config.get(APP_NAME, "samza"), config.get(APP_ID, "1")); + this.currentStatus = ApplicationStatus.New; + this.watcher = new KubePodStatusWatcher(podName); + this.nameSpace = config.get(K8S_API_NAMESPACE, "default"); + this.image = config.get(APP_IMAGE, DEFAULT_IMAGE); + } + + /** + * submit the kubernetes job coordinator + */ + public KubeJob submit() { + // create SamzaResourceRequest + int memoryMB = config.getInt("cluster-manager.container.memory.mb", 1024); // TODO + int numCores = config.getInt("cluster-manager.container.cpu.cores", 1); // TODO + String preferredHost = ResourceRequestState.ANY_HOST; + SamzaResourceRequest request = new SamzaResourceRequest(numCores, memoryMB, preferredHost, podName); + + // create Container + String fwkPath = config.get("samza.fwk.path", ""); + String fwkVersion = config.get("samza.fwk.version"); + String cmd = buildJobCoordinatorCmd(fwkPath, fwkVersion); + LOG.info(String.format("samza.fwk.path: %s. samza.fwk.version: %s. Command: %s", fwkPath, fwkVersion, cmd)); + Container container = KubeUtils.createContainer(JC_CONTAINER_NAME_PREFIX, image, request, cmd); + container.setEnv(getEnvs()); + + PodBuilder podBuilder; + if (config.getBoolean(AZURE_REMOTE_VOLUME_ENABLED)) { + AzureFileVolumeSource azureFileVolumeSource = new AzureFileVolumeSource(false, + config.get(AZURE_SECRET, "azure-secret"), config.get(AZURE_FILESHARE, "aksshare")); + Volume volume = new Volume(); + volume.setAzureFile(azureFileVolumeSource); + volume.setName("azure"); + VolumeMount volumeMount = new VolumeMount(); + volumeMount.setMountPath(config.get(SAMZA_MOUNT_DIR, "/tmp/mnt")); + volumeMount.setName("azure"); + volumeMount.setSubPath(podName); + container.setVolumeMounts(Collections.singletonList(volumeMount)); + podBuilder = new PodBuilder() + .editOrNewMetadata() + .withNamespace(nameSpace) + .withName(podName) + .endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .withContainers(container) + .withVolumes(volume) + .endSpec(); + } else { + + // create Pod + podBuilder = new PodBuilder() + .editOrNewMetadata() + .withNamespace(nameSpace) + .withName(podName) + .endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .withContainers(container) + .endSpec(); + } + + Pod pod = podBuilder.build(); + kubernetesClient.pods().create(pod); + // TODO: adding watcher here makes Client waiting .. Need to fix. + kubernetesClient.pods().withName(podName).watch(watcher); + return this; + } + + /** + * Kill the job coordinator pod + */ + public KubeJob kill() { + LOG.info("Killing application: {}, operator pod: {}, namespace: {}", config.get(APP_NAME), podName, nameSpace); + System.out.println("Killing application: " + config.get(APP_NAME) + + "; Operator pod: " + podName + "; namespace: " + nameSpace); + kubernetesClient.pods().inNamespace(nameSpace).withName(podName).delete(); + return this; + } + + /** + * Wait for finish without timeout + */ + public ApplicationStatus waitForFinish(long timeoutMs) { + watcher.waitForCompleted(timeoutMs, TimeUnit.MILLISECONDS); + return getStatus(); + } + + /** + * Wait for the application to reach a status + */ + public ApplicationStatus waitForStatus(ApplicationStatus status, long timeoutMs) { + switch (status.getStatusCode()) { + case New: + watcher.waitForPending(timeoutMs, TimeUnit.MILLISECONDS); + return New; + case Running: + watcher.waitForRunning(timeoutMs, TimeUnit.MILLISECONDS); + return Running; + case SuccessfulFinish: + watcher.waitForSucceeded(timeoutMs, TimeUnit.MILLISECONDS); + return SuccessfulFinish; + case UnsuccessfulFinish: + watcher.waitForFailed(timeoutMs, TimeUnit.MILLISECONDS); + return UnsuccessfulFinish; + default: + throw new SamzaException("Unsupported application status type: " + status); + } + } + + /** + * Get teh Status of the job coordinator pod + */ + public ApplicationStatus getStatus() { + Pod operatorPod = kubernetesClient.pods().inNamespace(nameSpace).withName(podName).get(); + PodStatus podStatus = operatorPod.getStatus(); + // TODO + switch (podStatus.getPhase()) { + case "Pending": + currentStatus = ApplicationStatus.New; + break; + case "Running": + currentStatus = Running; + break; + case "Completed": + case "Succeeded": + currentStatus = ApplicationStatus.SuccessfulFinish; + break; + case "Failed": + String err = new StringBuilder().append("Reason: ").append(podStatus.getReason()) + .append("Conditions: ").append(podStatus.getConditions().toString()).toString(); + currentStatus = ApplicationStatus.unsuccessfulFinish(new SamzaException(err)); + break; + case "CrashLoopBackOff": + case "Unknown": + default: + currentStatus = ApplicationStatus.New; + } + return currentStatus; + } + + // Build the job coordinator command + private String buildJobCoordinatorCmd(String fwkPath, String fwkVersion) { + // figure out if we have framework is deployed into a separate location + if (fwkVersion == null || fwkVersion.isEmpty()) { + fwkVersion = "STABLE"; + } + LOG.info(String.format("KubeJob: fwk_path is %s, ver is %s use it directly ", fwkPath, fwkVersion)); + + // default location + String cmdExec = "/opt/hello-samza/bin/run-jc.sh"; + if (!fwkPath.isEmpty()) { + // if we have framework installed as a separate package - use it + cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-jc.sh"; + } + return cmdExec; + } + + // Construct the envs for the job coordinator pod + private List getEnvs() { + MapConfig coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config); + List envList = new ArrayList<>(); + ObjectMapper objectMapper = getObjectMapper(); + String coordinatorSysConfig; + try { + coordinatorSysConfig = objectMapper.writeValueAsString(coordinatorSystemConfig); + } catch (IOException ex) { + LOG.warn("No coordinator system configs!", ex); + coordinatorSysConfig = ""; + } + envList.add(new EnvVar("SAMZA_COORDINATOR_SYSTEM_CONFIG", Util.envVarEscape(coordinatorSysConfig), null)); + envList.add(new EnvVar("SAMZA_LOG_DIR", config.get(SAMZA_MOUNT_DIR), null)); + envList.add(new EnvVar(COORDINATOR_POD_NAME, podName, null)); + envList.add(new EnvVar("JAVA_OPTS", "", null)); + return envList; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java new file mode 100644 index 0000000000..4ec39cf887 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import org.apache.samza.config.Config; +import org.apache.samza.job.StreamJobFactory; + + +public class KubeJobFactory implements StreamJobFactory { + + @Override + public KubeJob getJob(Config config) { + return new KubeJob(config); + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java new file mode 100644 index 0000000000..0b73664803 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class KubePodStatusWatcher implements Watcher { + private static final Logger LOG = LoggerFactory.getLogger(KubeJob.class); + private Optional pod = Optional.empty(); + private String phase = "unknown"; + private String appId; + private CountDownLatch podRunningLatch = new CountDownLatch(1); + private CountDownLatch podPendingLatch = new CountDownLatch(1); + private CountDownLatch podSucceededLatch = new CountDownLatch(1); + private CountDownLatch podFailedLatch = new CountDownLatch(1); + private CountDownLatch podCompletedLatch = new CountDownLatch(1); + + public KubePodStatusWatcher(String appId) { + this.appId = appId; + } + + @Override + public void eventReceived(Action action, Pod pod) { + this.pod = Optional.of(pod); + switch (action) { + case DELETED: + case ERROR : + closeAllWatch(); + break; + default: + if (isFailed()) { + closeWatchWhenFailed(); + } else if(isSucceeded()) { + closeWatchWhenSucceed(); + } else if (isRunning()) { + closeWatchWhenRunning(); + } else if (isPending()) { + closeWatchWhenPending(); + } + } + } + + @Override + public void onClose(KubernetesClientException e) { + LOG.info("Stopping watching application {} with last-observed phase {}", appId, phase); + closeAllWatch(); + } + + public void waitForCompleted(long timeout, TimeUnit unit) { + try { + podCompletedLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForCompleted() was interrupted by exception: ", e); + } + } + + public void waitForSucceeded(long timeout, TimeUnit unit) { + try { + podSucceededLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForCompleted() was interrupted by exception: ", e); + } + } + + public void waitForFailed(long timeout, TimeUnit unit) { + try { + podFailedLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForCompleted() was interrupted by exception: ", e); + } + } + + public void waitForRunning(long timeout, TimeUnit unit) { + try { + podRunningLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForRunning() was interrupted by exception: ", e); + } + } + + public void waitForPending(long timeout, TimeUnit unit) { + try { + podPendingLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForPending() was interrupted by exception: ", e); + } + } + + private boolean isSucceeded() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Succeeded"; + } + + private boolean isFailed() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Failed"; + } + + private boolean isRunning() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Running"; + } + + private boolean isPending() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Pending"; + } + + private void closeWatchWhenRunning() { + podRunningLatch.countDown(); + // TODO: may add a logging thread + } + + private void closeWatchWhenPending() { + podPendingLatch.countDown(); + // TODO: may add a logging thread + } + + + private void closeWatchWhenFailed() { + podFailedLatch.countDown(); + // TODO: may add a logging thread + } + + private void closeWatchWhenSucceed() { + podSucceededLatch.countDown(); + // TODO: may add a logging thread + } + + private void closeAllWatch() { + closeWatchWhenFailed(); + closeWatchWhenSucceed(); + closeWatchWhenPending(); + closeWatchWhenRunning(); + closeWatchWhenFailed(); + closeWatchWhenSucceed(); + // TODO: may add a logging thread + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java new file mode 100644 index 0000000000..68f64bf807 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.kubernetes; + +import org.apache.samza.clustermanager.ClusterResourceManager; +import org.apache.samza.clustermanager.ResourceManagerFactory; +import org.apache.samza.clustermanager.SamzaApplicationState; +import org.apache.samza.config.Config; +import org.apache.samza.coordinator.JobModelManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubeResourceManagerFactory implements ResourceManagerFactory { + private static Logger log = LoggerFactory.getLogger(KubeResourceManagerFactory.class); + + @Override + public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) { + JobModelManager jobModelManager = state.jobModelManager; + Config config = jobModelManager.jobModel().getConfig(); + KubeClusterResourceManager manager = new KubeClusterResourceManager(config, jobModelManager, callback); + log.info("KubeClusterResourceManager created"); + return manager; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java new file mode 100644 index 0000000000..5af5258246 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.*; +import org.apache.samza.clustermanager.SamzaResourceRequest; + +public class KubeUtils { + + public static String getSamzaContainerNameFromPodName(String podName) { + // stream-processor-appName-appId-containerId + String[] splits = podName.split("-"); + return splits[splits.length - 1]; + } + + public static Pod createPod(String name, OwnerReference ownerReference, String restartPolicy, Container container) { + return new PodBuilder().editOrNewMetadata().withName(name).withOwnerReferences(ownerReference).endMetadata() + .editOrNewSpec().withRestartPolicy(restartPolicy).addToContainers(container).endSpec().build(); + } + + public static Pod createPod(String name, String restartPolicy, Container container, String namespace) { + return new PodBuilder().editOrNewMetadata().withNamespace(namespace).withName(name).endMetadata() + .editOrNewSpec().withRestartPolicy(restartPolicy).addToContainers(container).endSpec().build(); + } + + // for Samza operator + public static Container createContainer(String containerId, String image, SamzaResourceRequest resourceRequest, + String cmd) { + Quantity memQuantity = new QuantityBuilder(false) + .withAmount(String.valueOf(resourceRequest.getMemoryMB())).withFormat("Mi").build(); + Quantity cpuQuantity = new QuantityBuilder(false) + .withAmount(String.valueOf(resourceRequest.getNumCores())).build(); + return new ContainerBuilder().withName(containerId).withImage(image).withImagePullPolicy("Always").withCommand(cmd).editOrNewResources() + .addToRequests("memory", memQuantity).addToRequests("cpu", cpuQuantity).endResources().build(); + } + + // TODO: will add util methods describing details about Pod status and container status. Refer to Spark'KubernetesUtils. + // Then we can use them in logs and exception messages. +} diff --git a/settings.gradle b/settings.gradle index c636706d71..eb0f72f07e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -30,6 +30,7 @@ def scalaModules = [ 'samza-elasticsearch', 'samza-hdfs', 'samza-kafka', + 'samza-kubernetes', 'samza-kv', 'samza-kv-inmemory', 'samza-kv-rocksdb', From 5c835bf1b0ea6aa29de284fe45ae53b0de2c7234 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Tue, 22 Oct 2019 16:51:06 -0700 Subject: [PATCH 2/9] Fix the conflicts with master branch --- gradlew.bat | 84 ------------------- .../ContainerProcessManager.java | 7 +- .../src/main/java/org/apache/samza/README.md | 26 ++++++ .../KubeClusterResourceManager.java | 15 +--- 4 files changed, 31 insertions(+), 101 deletions(-) delete mode 100644 gradlew.bat create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/README.md diff --git a/gradlew.bat b/gradlew.bat deleted file mode 100644 index e95643d6a2..0000000000 --- a/gradlew.bat +++ /dev/null @@ -1,84 +0,0 @@ -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto init - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:init -@rem Get command-line arguments, handling Windows variants - -if not "%OS%" == "Windows_NT" goto win9xME_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index edf0f04c20..6eda0168b0 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -260,11 +260,10 @@ public void stop() { // Shutdown allocator thread containerAllocator.stop(); if (allocatorThread != null) { - try { - allocatorThread.join(); - LOG.info("Stopped container allocator"); - } catch (InterruptedException ie) { + allocatorThread.join(); + LOG.info("Stopped container allocator"); + } catch (InterruptedException ie) { LOG.error("Allocator thread join threw an interrupted exception", ie); Thread.currentThread().interrupt(); } diff --git a/samza-kubernetes/src/main/java/org/apache/samza/README.md b/samza-kubernetes/src/main/java/org/apache/samza/README.md new file mode 100644 index 0000000000..0d87a29e8d --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/README.md @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +# Configurations +- kube.app.image: the image name for the samza app +- kube.app.namespace: the namespace where the samza app runs +- kube.app.pod.mnt.path: the path where the remote volume is mounted into the pod, for both the job coordinator pod and stream processor pod + the volume can be used for storing logs and local states. +- cluster-manager.container.memory.mb: the memory size for the samza stream processor +- cluster-manager.container.cpu.cores: the cpu cores for the samza stream processor diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java index d341eceda0..54babd73a8 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -30,7 +30,6 @@ import org.apache.samza.clustermanager.*; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; @@ -163,7 +162,7 @@ private void createNewStreamProcessor(Pod pod) { @Override public void requestResources(SamzaResourceRequest resourceRequest) { - String samzaContainerId = resourceRequest.getContainerID(); + String samzaContainerId = resourceRequest.getProcessorId(); LOG.info("Requesting resources on " + resourceRequest.getPreferredHost() + " for container " + samzaContainerId); CommandBuilder builder = getCommandBuilder(samzaContainerId); String command = buildCmd(builder); @@ -238,7 +237,7 @@ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder @Override public void stopStreamProcessor(SamzaResource resource) { - client.pods().withName(resource.getResourceID()).delete(); + client.pods().withName(resource.getContainerId()).delete(); } @Override @@ -249,17 +248,7 @@ public void stop(SamzaApplicationState.SamzaAppStatus status) { } private String buildCmd(CommandBuilder cmdBuilder) { - // TODO: check if we have framework path specified. If yes - use it, if not use default /opt/hello-samza/ - String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries String cmdPath = "/opt/samza/"; // TODO - - String fwkPath = JobConfig.getFwkPath(config); - if(fwkPath != null && (! fwkPath.isEmpty())) { - cmdPath = fwkPath; - jobLib = "export JOB_LIB_DIR=/opt/samza/lib"; - } - LOG.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib); - cmdBuilder.setCommandPath(cmdPath); return cmdBuilder.buildCommand(); From 9d8e752ee7aee023fba7228fb82d8eb7def45ef2 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 4 Nov 2019 17:59:58 -0800 Subject: [PATCH 3/9] clean code and add code comments --- .../ClusterBasedJobCoordinator.java | 3 +- .../ContainerProcessManager.java | 12 +++-- .../samza/coordinator/server/HttpServer.scala | 5 +- .../org/apache/samza/util/HttpUtil.scala | 3 +- .../org/apache/samza/config/KubeConfig.java | 19 +++++++- .../job/kubernetes/KubeClientFactory.java | 4 +- .../KubeClusterResourceManager.java | 47 +++++++------------ .../apache/samza/job/kubernetes/KubeJob.java | 16 ++++--- .../samza/job/kubernetes/KubeJobFactory.java | 5 +- .../job/kubernetes/KubePodStatusWatcher.java | 9 ++-- .../KubeResourceManagerFactory.java | 3 ++ .../samza/job/kubernetes/KubeUtils.java | 7 ++- 12 files changed, 79 insertions(+), 54 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index a308c19572..0e16136ef7 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -413,7 +413,8 @@ public static void main(String[] args) { //Read and parse the coordinator system config. LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); coordinatorSystemConfig = - new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); + new MapConfig(SamzaObjectMapper.getObjectMapper() + .readValue(coordinatorSystemEnv.replace("\\\"", "\""), Config.class)); LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); } catch (IOException e) { LOG.error("Exception while reading coordinator stream config", e); diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index 6eda0168b0..3f87863754 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -204,14 +204,20 @@ public boolean shouldStartAllocateThread() { } public boolean shouldShutdown() { - LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}, Is allocator thread alive: {}", - state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no"); + LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}", + state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no"); + + if (shouldStartAllocateThread()) { + LOG.debug("Is allocator thread alive: {}", allocatorThread.isAlive() ? "yes" : "no"); + } if (exceptionOccurred != null) { LOG.error("Exception in container process manager", exceptionOccurred); throw new SamzaException(exceptionOccurred); } - return jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get() || !allocatorThread.isAlive(); + + boolean shouldShutdown = jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get(); + return shouldStartAllocateThread() ? shouldShutdown || !allocatorThread.isAlive() : shouldShutdown; } public void start() { diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala index 055fae0928..a2009b6150 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala @@ -125,7 +125,7 @@ class HttpServer( } /** - * Returns the URL for the root of the HTTP server. This method + * Returns the URL for the root of the HTTP server. This URL is generated with host name. */ def getUrl = { if (running) { @@ -137,6 +137,9 @@ class HttpServer( } } + /** + * Returns the URL for the root of the HTTP server. This URL is generated with host address. + */ def getIpUrl = { if (running) { val runningPort = server.getConnectors()(0).asInstanceOf[NetworkConnector].getLocalPort() diff --git a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala index 752322f9e6..687132bc86 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala @@ -55,8 +55,7 @@ object HttpUtil { (exception, loop) => { exception match { case ioe: IOException => { - error(ioe) - warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass) + error("Error getting response from Job coordinator server. Received IOException: %s. Retrying..." format ioe) httpConn = getHttpConnection(url, timeout) } case e: Exception => diff --git a/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java b/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java index c339577021..c8eb51aab4 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java @@ -19,14 +19,27 @@ package org.apache.samza.config; +/** + * Kubernetes related configurations + */ public class KubeConfig { // the image name of samza public static final String APP_IMAGE = "kube.app.image"; public static final String DEFAULT_IMAGE = "weiqingyang/samza:v0"; + // the default working directory + public static final String DEFAULT_DIRECTORY = "/opt/samza/"; + + // the memory and the cpu cores of container + public static final String CLUSTER_MANAGER_CONTAINER_MEM_SIZE = "cluster-manager.container.memory.mb"; + public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_MEM_SIZE = 1024; + public static final String CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM = "cluster-manager.container.cpu.cores"; + public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM = 1; + // The directory path inside which the log will be stored. public static final String SAMZA_MOUNT_DIR = "kube.app.pod.mnt.path"; + public static final String DEFAULT_SAMZA_MOUNT_DIR = "/tmp/mnt"; public static final String K8S_API_NAMESPACE = "kube.app.namespace"; public static final String STREAM_PROCESSOR_CONTAINER_NAME_PREFIX = "sp"; public static final String JC_CONTAINER_NAME_PREFIX = "jc"; @@ -38,9 +51,12 @@ public class KubeConfig { public static final String COORDINATOR_POD_NAME = "COORDINATOR_POD_NAME"; public static final String AZURE_REMOTE_VOLUME_ENABLED = "kube.app.volume.azure.file-share.enabled"; public static final String AZURE_SECRET = "kube.app.volume.azure-secret"; + public static final String DEFAULT_AZURE_SECRET = "azure-secret"; public static final String AZURE_FILESHARE = "kube.app.volume.azure.file-share"; + public static final String DEFAULT_AZURE_FILESHARE = "aksshare"; private Config config; + public KubeConfig(Config config) { this.config = config; } @@ -51,7 +67,8 @@ public static KubeConfig validate(Config config) throws ConfigException { return kc; } + // TODO: SAMZA-2365: validate KubeConfig before starting the job private void validate() throws ConfigException { - // TODO + } } diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java index 4405a0969d..0c1d2b1b83 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java @@ -26,7 +26,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +/** + * A KubeClientFactory returns a default KubernetesClient + */ public class KubeClientFactory { private static final Logger LOG = LoggerFactory.getLogger(KubeClientFactory.class); diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java index 54babd73a8..0617bded0a 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -23,11 +23,13 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; -import java.net.MalformedURLException; import java.net.URL; import java.util.*; -import org.apache.samza.SamzaException; -import org.apache.samza.clustermanager.*; +import org.apache.samza.clustermanager.ClusterResourceManager; +import org.apache.samza.clustermanager.ResourceRequestState; +import org.apache.samza.clustermanager.SamzaApplicationState; +import org.apache.samza.clustermanager.SamzaResourceRequest; +import org.apache.samza.clustermanager.SamzaResource; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; import org.apache.samza.config.TaskConfig; @@ -42,6 +44,10 @@ import static org.apache.samza.config.ApplicationConfig.*; import static org.apache.samza.config.KubeConfig.*; +/** + * An {@link KubeClusterResourceManager} implements a ClusterResourceManager using Kubernetes as the underlying + * resource manager. + */ public class KubeClusterResourceManager extends ClusterResourceManager { private static final Logger LOG = LoggerFactory.getLogger(KubeClusterResourceManager.class); private final Map podLabels = new HashMap<>(); @@ -129,7 +135,7 @@ public void onClose(KubernetesClientException e) { } }; - // TODO: "podLabels" is empty. Need to add lable when creating Pod + // TODO: SAMZA-2367: "podLabels" is empty. Need to add labels when creating Pod client.pods().withLabels(podLabels).watch(watcher); } @@ -173,16 +179,16 @@ public void requestResources(SamzaResourceRequest resourceRequest) { PodBuilder podBuilder; if (config.getBoolean(AZURE_REMOTE_VOLUME_ENABLED)) { - AzureFileVolumeSource azureFileVolumeSource = - new AzureFileVolumeSource(false, config.get(AZURE_SECRET, "azure-secret"), config.get(AZURE_FILESHARE, "aksshare")); + AzureFileVolumeSource azureFileVolumeSource = new AzureFileVolumeSource(false, + config.get(AZURE_SECRET, DEFAULT_AZURE_SECRET), config.get(AZURE_FILESHARE, DEFAULT_AZURE_FILESHARE)); Volume volume = new Volume(); volume.setAzureFile(azureFileVolumeSource); volume.setName("azure"); VolumeMount volumeMount = new VolumeMount(); - volumeMount.setMountPath(config.get(SAMZA_MOUNT_DIR, "/tmp/mnt")); + volumeMount.setMountPath(config.get(SAMZA_MOUNT_DIR, DEFAULT_SAMZA_MOUNT_DIR)); volumeMount.setName("azure"); volumeMount.setSubPath(podName); - LOG.info("Set subpath to " + podName + ", mountpath to " + config.get(SAMZA_MOUNT_DIR, "/tmp/mnt")); + LOG.info("Set subpath to " + podName + ", mountpath to " + config.get(SAMZA_MOUNT_DIR, DEFAULT_SAMZA_MOUNT_DIR)); container.setVolumeMounts(Collections.singletonList(volumeMount)); podBuilder = new PodBuilder().editOrNewMetadata() .withName(podName) @@ -244,13 +250,10 @@ public void stopStreamProcessor(SamzaResource resource) { public void stop(SamzaApplicationState.SamzaAppStatus status) { LOG.info("Kubernetes Cluster ResourceManager stopped"); jobModelManager.stop(); - // TODO: need to check } private String buildCmd(CommandBuilder cmdBuilder) { - String cmdPath = "/opt/samza/"; // TODO - cmdBuilder.setCommandPath(cmdPath); - + cmdBuilder.setCommandPath(DEFAULT_DIRECTORY); return cmdBuilder.buildCommand(); } @@ -280,7 +283,8 @@ private List getEnvs(CommandBuilder cmdBuilder) { sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue())); //logging } - // TODO: The ID assigned to the container by the execution environment: K8s container Id. ?? Seems there is no id + // TODO: SAMZA-2366: make the container ID as an execution environment and pass it to the container. + // Seems there is no such id (K8s container id)? // envList.add(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), container.getId().toString()); // sb.append(String.format("\n%s=%s", ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), container.getId().toString())); @@ -292,21 +296,4 @@ private List getEnvs(CommandBuilder cmdBuilder) { return envList; } - - private URL formatUrl(URL url) { - int port = url.getPort(); - String host = url.getHost(); - LOG.info("Original host: {}, port: {}, url: {}", host, port, url); - - String formattedHost = host + "."+ namespace + ".svc.cluster.local"; - LOG.info("Formatted host: {}, port: {}", formattedHost, port); - URL newUrl; - try { - newUrl = new URL("http://" + formattedHost + ":" + url.getPort()); - LOG.info("Formatted URL: {}", newUrl); - } catch (MalformedURLException ex) { - throw new SamzaException(ex); - } - return newUrl; - } } diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java index 9ede574248..6fb952a08d 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java @@ -76,12 +76,13 @@ public KubeJob(Config config) { */ public KubeJob submit() { // create SamzaResourceRequest - int memoryMB = config.getInt("cluster-manager.container.memory.mb", 1024); // TODO - int numCores = config.getInt("cluster-manager.container.cpu.cores", 1); // TODO + int memoryMB = config.getInt(CLUSTER_MANAGER_CONTAINER_MEM_SIZE, DEFAULT_CLUSTER_MANAGER_CONTAINER_MEM_SIZE); + int numCores = config.getInt(CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM, DEFAULT_CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM); String preferredHost = ResourceRequestState.ANY_HOST; SamzaResourceRequest request = new SamzaResourceRequest(numCores, memoryMB, preferredHost, podName); // create Container + // TODO: SAMZA-2368: Figure out "samza.fwk.path" and "samza.fwk.version" are still needed in Samza 1.3 String fwkPath = config.get("samza.fwk.path", ""); String fwkVersion = config.get("samza.fwk.version"); String cmd = buildJobCoordinatorCmd(fwkPath, fwkVersion); @@ -92,12 +93,12 @@ public KubeJob submit() { PodBuilder podBuilder; if (config.getBoolean(AZURE_REMOTE_VOLUME_ENABLED)) { AzureFileVolumeSource azureFileVolumeSource = new AzureFileVolumeSource(false, - config.get(AZURE_SECRET, "azure-secret"), config.get(AZURE_FILESHARE, "aksshare")); + config.get(AZURE_SECRET, DEFAULT_AZURE_SECRET), config.get(AZURE_FILESHARE, DEFAULT_AZURE_FILESHARE)); Volume volume = new Volume(); volume.setAzureFile(azureFileVolumeSource); volume.setName("azure"); VolumeMount volumeMount = new VolumeMount(); - volumeMount.setMountPath(config.get(SAMZA_MOUNT_DIR, "/tmp/mnt")); + volumeMount.setMountPath(config.get(SAMZA_MOUNT_DIR, DEFAULT_SAMZA_MOUNT_DIR)); volumeMount.setName("azure"); volumeMount.setSubPath(podName); container.setVolumeMounts(Collections.singletonList(volumeMount)); @@ -112,7 +113,6 @@ public KubeJob submit() { .withVolumes(volume) .endSpec(); } else { - // create Pod podBuilder = new PodBuilder() .editOrNewMetadata() @@ -127,7 +127,8 @@ public KubeJob submit() { Pod pod = podBuilder.build(); kubernetesClient.pods().create(pod); - // TODO: adding watcher here makes Client waiting .. Need to fix. + // TODO: SAMZA-2247: the watcher here makes Client hung (always waiting). Although it doesn't affect the operator + // and worker containers, we still need to fix this issue. kubernetesClient.pods().withName(podName).watch(watcher); return this; } @@ -213,7 +214,7 @@ private String buildJobCoordinatorCmd(String fwkPath, String fwkVersion) { LOG.info(String.format("KubeJob: fwk_path is %s, ver is %s use it directly ", fwkPath, fwkVersion)); // default location - String cmdExec = "/opt/hello-samza/bin/run-jc.sh"; + String cmdExec = DEFAULT_DIRECTORY + "bin/run-jc.sh"; if (!fwkPath.isEmpty()) { // if we have framework installed as a separate package - use it cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-jc.sh"; @@ -224,6 +225,7 @@ private String buildJobCoordinatorCmd(String fwkPath, String fwkVersion) { // Construct the envs for the job coordinator pod private List getEnvs() { MapConfig coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config); + LOG.info("Coordinator system config: {}", coordinatorSystemConfig); List envList = new ArrayList<>(); ObjectMapper objectMapper = getObjectMapper(); String coordinatorSysConfig; diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java index 4ec39cf887..1379637398 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java @@ -20,9 +20,12 @@ package org.apache.samza.job.kubernetes; import org.apache.samza.config.Config; +import org.apache.samza.job.StreamJob; import org.apache.samza.job.StreamJobFactory; - +/** + * A KubeJobFactory returns an implementation of a {@link StreamJob} for a Samza application running on Kubernetes. + */ public class KubeJobFactory implements StreamJobFactory { @Override diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java index 0b73664803..21e923f4b2 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java @@ -28,7 +28,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A monitor for the running Kubernetes pod of a Samza application + */ +// TODO: SAMZA-2369: Add a logging thread which is similar to LoggingPodStatusWatcher in Spark public class KubePodStatusWatcher implements Watcher { private static final Logger LOG = LoggerFactory.getLogger(KubeJob.class); private Optional pod = Optional.empty(); @@ -141,23 +145,19 @@ private boolean isPending() { private void closeWatchWhenRunning() { podRunningLatch.countDown(); - // TODO: may add a logging thread } private void closeWatchWhenPending() { podPendingLatch.countDown(); - // TODO: may add a logging thread } private void closeWatchWhenFailed() { podFailedLatch.countDown(); - // TODO: may add a logging thread } private void closeWatchWhenSucceed() { podSucceededLatch.countDown(); - // TODO: may add a logging thread } private void closeAllWatch() { @@ -167,6 +167,5 @@ private void closeAllWatch() { closeWatchWhenRunning(); closeWatchWhenFailed(); closeWatchWhenSucceed(); - // TODO: may add a logging thread } } diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java index 68f64bf807..0351ade057 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java @@ -26,6 +26,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A KubeResourceManagerFactory returns an implementation of a {@link ClusterResourceManager} for Kubernetes. + */ public class KubeResourceManagerFactory implements ResourceManagerFactory { private static Logger log = LoggerFactory.getLogger(KubeResourceManagerFactory.class); diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java index 5af5258246..666aadea1d 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java @@ -22,6 +22,9 @@ import io.fabric8.kubernetes.api.model.*; import org.apache.samza.clustermanager.SamzaResourceRequest; +/** + * Convenient utility class with static methods. + */ public class KubeUtils { public static String getSamzaContainerNameFromPodName(String podName) { @@ -51,6 +54,6 @@ public static Container createContainer(String containerId, String image, SamzaR .addToRequests("memory", memQuantity).addToRequests("cpu", cpuQuantity).endResources().build(); } - // TODO: will add util methods describing details about Pod status and container status. Refer to Spark'KubernetesUtils. - // Then we can use them in logs and exception messages. + // TODO: SAMZA-2371: add util methods (similar to KubernetesUtils in Spark) describing details about pod status and + // container status, then we can use these methods in logs and exception messages. } From 1bbc8e06ca740de71ffba7cc61dd24a8224691bb Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 4 Nov 2019 18:03:18 -0800 Subject: [PATCH 4/9] Remove unused comments --- .../apache/samza/job/kubernetes/KubeClusterResourceManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java index 0617bded0a..52051bd98e 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -257,7 +257,6 @@ private String buildCmd(CommandBuilder cmdBuilder) { return cmdBuilder.buildCommand(); } - // TODO: Need to check it again later!! Check AbstractContainerAllocator.getCommandBuilder(samzaContainerId) private CommandBuilder getCommandBuilder(String containerId) { TaskConfig taskConfig = new TaskConfig(config); String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName()); From 9071f7bfc4bdb37245300090dbd765bfa8a756ec Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Thu, 4 Mar 2021 20:11:27 -0800 Subject: [PATCH 5/9] Fix conflicts --- .../samza/job/kubernetes/KubeClusterResourceManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java index 52051bd98e..a8343e7f28 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -37,7 +37,7 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.job.CommandBuilder; import org.apache.samza.job.ShellCommandBuilder; -import org.apache.samza.util.Util; +import org.apache.samza.util.ReflectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -260,7 +260,7 @@ private String buildCmd(CommandBuilder cmdBuilder) { private CommandBuilder getCommandBuilder(String containerId) { TaskConfig taskConfig = new TaskConfig(config); String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName()); - CommandBuilder cmdBuilder = Util.getObj(cmdBuilderClassName, CommandBuilder.class); + CommandBuilder cmdBuilder = ReflectionUtil.getObj(cmdBuilderClassName, CommandBuilder.class); if (jobModelManager.server() == null) { LOG.error("HttpServer is null"); } From 6ff8d7c21d0895f0b832cf16b1c1165332dff50e Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Thu, 4 Mar 2021 22:54:17 -0800 Subject: [PATCH 6/9] fix conflict --- .../job/kubernetes/KubeClusterResourceManager.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java index a8343e7f28..bd28054127 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -34,6 +34,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.server.LocalityServlet; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.job.CommandBuilder; import org.apache.samza.job.ShellCommandBuilder; @@ -158,10 +159,12 @@ private void createNewStreamProcessor(Pod pod) { String containerId = KubeUtils.getSamzaContainerNameFromPodName(pod.getMetadata().getName()); // Find out previously running container location - String lastSeenOn = jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY); - if (!hostAffinityEnabled || lastSeenOn == null) { - lastSeenOn = ResourceRequestState.ANY_HOST; - } + // TODO: need to get the locality information. The logic below works for samza 1.3 or earlier version only. + /* String lastSeenOn = jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY); + if (!hostAffinityEnabled || lastSeenOn == null) { + lastSeenOn = ResourceRequestState.ANY_HOST; + } */ + String lastSeenOn = ResourceRequestState.ANY_HOST; SamzaResourceRequest request = new SamzaResourceRequest(cpu, memory, lastSeenOn, containerId); requestResources(request); } From 66cb923b538a9fe31e56656b105feb6aeede1c4e Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Thu, 4 Mar 2021 23:19:27 -0800 Subject: [PATCH 7/9] Fix conflicts --- .../samza/job/kubernetes/KubeClusterResourceManager.java | 4 +--- .../java/org/apache/samza/job/kubernetes/KubeJob.java | 8 +++----- .../apache/samza/job/kubernetes/KubePodStatusWatcher.java | 2 +- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java index bd28054127..1f434beb54 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -34,8 +34,6 @@ import org.apache.samza.config.Config; import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.JobModelManager; -import org.apache.samza.coordinator.server.LocalityServlet; -import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.job.CommandBuilder; import org.apache.samza.job.ShellCommandBuilder; import org.apache.samza.util.ReflectionUtil; @@ -159,7 +157,7 @@ private void createNewStreamProcessor(Pod pod) { String containerId = KubeUtils.getSamzaContainerNameFromPodName(pod.getMetadata().getName()); // Find out previously running container location - // TODO: need to get the locality information. The logic below works for samza 1.3 or earlier version only. + // TODO: need to get the locality information. The logic below only works for samza 1.3 or earlier version. /* String lastSeenOn = jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY); if (!hostAffinityEnabled || lastSeenOn == null) { lastSeenOn = ResourceRequestState.ANY_HOST; diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java index 6fb952a08d..5ff5cc5071 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java @@ -28,9 +28,9 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.job.StreamJob; +import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.samza.util.CoordinatorStreamUtil; import org.apache.samza.util.Util; -import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.mutable.StringBuilder; @@ -45,7 +45,6 @@ import static org.apache.samza.config.ApplicationConfig.APP_NAME; import static org.apache.samza.config.KubeConfig.*; import static org.apache.samza.job.ApplicationStatus.*; -import static org.apache.samza.serializers.model.SamzaObjectMapper.getObjectMapper; /** * The client to start a Kubernetes job coordinator @@ -114,7 +113,7 @@ public KubeJob submit() { .endSpec(); } else { // create Pod - podBuilder = new PodBuilder() + podBuilder = new PodBuilder() .editOrNewMetadata() .withNamespace(nameSpace) .withName(podName) @@ -227,10 +226,9 @@ private List getEnvs() { MapConfig coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config); LOG.info("Coordinator system config: {}", coordinatorSystemConfig); List envList = new ArrayList<>(); - ObjectMapper objectMapper = getObjectMapper(); String coordinatorSysConfig; try { - coordinatorSysConfig = objectMapper.writeValueAsString(coordinatorSystemConfig); + coordinatorSysConfig = SamzaObjectMapper.getObjectMapper().writeValueAsString(coordinatorSystemConfig); } catch (IOException ex) { LOG.warn("No coordinator system configs!", ex); coordinatorSysConfig = ""; diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java index 21e923f4b2..f01117a3f1 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java @@ -59,7 +59,7 @@ public void eventReceived(Action action, Pod pod) { default: if (isFailed()) { closeWatchWhenFailed(); - } else if(isSucceeded()) { + } else if (isSucceeded()) { closeWatchWhenSucceed(); } else if (isRunning()) { closeWatchWhenRunning(); From fb405ae20ce4c168610c21cbfaafe2c0a7a46961 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Thu, 4 Mar 2021 23:57:04 -0800 Subject: [PATCH 8/9] Fix build error --- .../samza/job/kubernetes/KubeClusterResourceManager.java | 2 +- .../java/org/apache/samza/job/kubernetes/KubeJobFactory.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java index 1f434beb54..f430738fcd 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -157,7 +157,7 @@ private void createNewStreamProcessor(Pod pod) { String containerId = KubeUtils.getSamzaContainerNameFromPodName(pod.getMetadata().getName()); // Find out previously running container location - // TODO: need to get the locality information. The logic below only works for samza 1.3 or earlier version. + // TODO: SAMZA-2629: need to get the locality information. The logic below only works for samza 1.3 or earlier version. /* String lastSeenOn = jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY); if (!hostAffinityEnabled || lastSeenOn == null) { lastSeenOn = ResourceRequestState.ANY_HOST; diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java index 1379637398..52c44b74c1 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java @@ -20,11 +20,10 @@ package org.apache.samza.job.kubernetes; import org.apache.samza.config.Config; -import org.apache.samza.job.StreamJob; import org.apache.samza.job.StreamJobFactory; /** - * A KubeJobFactory returns an implementation of a {@link StreamJob} for a Samza application running on Kubernetes. + * A KubeJobFactory returns an implementation of a StreamJob for a Samza application running on Kubernetes. */ public class KubeJobFactory implements StreamJobFactory { From 44bbb472fef8a741c256e8bd2765440a5ccfe7d1 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Fri, 5 Mar 2021 11:56:45 -0800 Subject: [PATCH 9/9] Fix review comments --- .../ContainerProcessManager.java | 14 ++++---- .../org/apache/samza/config/KubeConfig.java | 4 +-- .../KubeClusterResourceManager.java | 17 +++++----- .../apache/samza/job/kubernetes/KubeJob.java | 33 +++++++++---------- 4 files changed, 32 insertions(+), 36 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index 7aa25e0986..fc89858737 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -100,7 +100,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback * The Allocator matches requests to resources and executes processes. */ private final ContainerAllocator containerAllocator; - private Thread allocatorThread = null; + private final Thread allocatorThread; // The ContainerManager manages control actions for both active & standby containers private final ContainerManager containerManager; @@ -181,6 +181,8 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager); if (shouldStartAllocateThread()) { this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); + } else { + this.allocatorThread = null; } this.restartContainers = restartContainers; LOG.info("Finished container process manager initialization."); @@ -220,12 +222,8 @@ public boolean shouldStartAllocateThread() { } public boolean shouldShutdown() { - LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}", - state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no"); - - if (shouldStartAllocateThread()) { - LOG.debug("Is allocator thread alive: {}", allocatorThread.isAlive() ? "yes" : "no"); - } + LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}, Is allocator thread alive: {}", + state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no", allocatorThread != null && allocatorThread.isAlive() ? "yes" : "no"); if (exceptionOccurred != null) { LOG.error("Exception in container process manager", exceptionOccurred); @@ -233,7 +231,7 @@ public boolean shouldShutdown() { } boolean shouldShutdown = jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get(); - return shouldStartAllocateThread() ? shouldShutdown || !allocatorThread.isAlive() : shouldShutdown; + return allocatorThread == null ? shouldShutdown : (shouldShutdown || !allocatorThread.isAlive()); } public void start() { diff --git a/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java b/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java index c8eb51aab4..c5d3121ddd 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java @@ -26,15 +26,13 @@ public class KubeConfig { // the image name of samza public static final String APP_IMAGE = "kube.app.image"; - public static final String DEFAULT_IMAGE = "weiqingyang/samza:v0"; + public static final String DEFAULT_IMAGE = "samza/samza:v0"; // the default working directory public static final String DEFAULT_DIRECTORY = "/opt/samza/"; // the memory and the cpu cores of container - public static final String CLUSTER_MANAGER_CONTAINER_MEM_SIZE = "cluster-manager.container.memory.mb"; public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_MEM_SIZE = 1024; - public static final String CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM = "cluster-manager.container.cpu.cores"; public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM = 1; // The directory path inside which the log will be stored. diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java index f430738fcd..329be961d7 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -49,17 +49,18 @@ */ public class KubeClusterResourceManager extends ClusterResourceManager { private static final Logger LOG = LoggerFactory.getLogger(KubeClusterResourceManager.class); + private final Map podLabels = new HashMap<>(); - private KubernetesClient client; - private String appId; - private String appName; - private String image; - private String namespace; - private OwnerReference ownerReference; - private JobModelManager jobModelManager; + private final KubernetesClient client; + private final String appId; + private final String appName; + private final String image; + private final String namespace; + private final JobModelManager jobModelManager; + private final Config config; private boolean hostAffinityEnabled; - private Config config; private String jcPodName; + private OwnerReference ownerReference; KubeClusterResourceManager(Config config, JobModelManager jobModelManager, ClusterResourceManager.Callback callback) { super(callback); diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java index 5ff5cc5071..39566aaf16 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java @@ -24,6 +24,7 @@ import org.apache.samza.SamzaException; import org.apache.samza.clustermanager.ResourceRequestState; import org.apache.samza.clustermanager.SamzaResourceRequest; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.job.ApplicationStatus; @@ -41,8 +42,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import static org.apache.samza.config.ApplicationConfig.APP_ID; -import static org.apache.samza.config.ApplicationConfig.APP_NAME; +import static org.apache.samza.config.ClusterManagerConfig.CLUSTER_MANAGER_MEMORY_MB; +import static org.apache.samza.config.ClusterManagerConfig.CLUSTER_MANAGER_MAX_CORES; import static org.apache.samza.config.KubeConfig.*; import static org.apache.samza.job.ApplicationStatus.*; @@ -51,19 +52,19 @@ */ public class KubeJob implements StreamJob { private static final Logger LOG = LoggerFactory.getLogger(KubeJob.class); - private Config config; - private KubernetesClient kubernetesClient; - private String podName; + + private final ApplicationConfig config; + private final KubernetesClient kubernetesClient; + private final String podName; + private final String nameSpace; + private final KubePodStatusWatcher watcher; + private final String image; private ApplicationStatus currentStatus; - private String nameSpace; - private KubePodStatusWatcher watcher; - private String image; - public KubeJob(Config config) { + public KubeJob(Config configs) { this.kubernetesClient = KubeClientFactory.create(); - this.config = config; - this.podName = String.format(JC_POD_NAME_FORMAT, JC_CONTAINER_NAME_PREFIX, - config.get(APP_NAME, "samza"), config.get(APP_ID, "1")); + this.config = new ApplicationConfig(configs); + this.podName = String.format(JC_POD_NAME_FORMAT, JC_CONTAINER_NAME_PREFIX, config.getAppName(), config.getAppId()); this.currentStatus = ApplicationStatus.New; this.watcher = new KubePodStatusWatcher(podName); this.nameSpace = config.get(K8S_API_NAMESPACE, "default"); @@ -75,8 +76,8 @@ public KubeJob(Config config) { */ public KubeJob submit() { // create SamzaResourceRequest - int memoryMB = config.getInt(CLUSTER_MANAGER_CONTAINER_MEM_SIZE, DEFAULT_CLUSTER_MANAGER_CONTAINER_MEM_SIZE); - int numCores = config.getInt(CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM, DEFAULT_CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM); + int memoryMB = config.getInt(CLUSTER_MANAGER_MEMORY_MB, DEFAULT_CLUSTER_MANAGER_CONTAINER_MEM_SIZE); + int numCores = config.getInt(CLUSTER_MANAGER_MAX_CORES, DEFAULT_CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM); String preferredHost = ResourceRequestState.ANY_HOST; SamzaResourceRequest request = new SamzaResourceRequest(numCores, memoryMB, preferredHost, podName); @@ -136,9 +137,7 @@ public KubeJob submit() { * Kill the job coordinator pod */ public KubeJob kill() { - LOG.info("Killing application: {}, operator pod: {}, namespace: {}", config.get(APP_NAME), podName, nameSpace); - System.out.println("Killing application: " + config.get(APP_NAME) + - "; Operator pod: " + podName + "; namespace: " + nameSpace); + LOG.info("Killing application: {}, operator pod: {}, namespace: {}", config.getAppName(), podName, nameSpace); kubernetesClient.pods().inNamespace(nameSpace).withName(podName).delete(); return this; }