-
Notifications
You must be signed in to change notification settings - Fork 336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-2067: Support Samza's running on Kubernetes #1197
base: master
Are you sure you want to change the base?
Changes from 4 commits
381a683
5c835bf
9d8e752
1bbc8e0
aafc2d4
9071f7b
6ff8d7c
66cb923
fb405ae
44bbb47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,7 +93,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback | |
* The Allocator matches requests to resources and executes processes. | ||
*/ | ||
private final ContainerAllocator containerAllocator; | ||
private final Thread allocatorThread; | ||
private Thread allocatorThread = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please keep this final and assign to different values in the constructor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. The RB was updated, and the change here has gone. |
||
|
||
// The StandbyContainerManager manages standby-aware allocation and failover of containers | ||
private final Optional<StandbyContainerManager> standbyContainerManager; | ||
|
@@ -166,8 +166,10 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri | |
} | ||
|
||
this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.standbyContainerManager); | ||
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); | ||
LOG.info("Finished container process manager initialization."); | ||
if (shouldStartAllocateThread()) { | ||
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do actually use containerAllocator anymore? Please walk through this part of the code with me offline. |
||
} | ||
LOG.info("finished initialization of samza task manager"); | ||
} | ||
|
||
@VisibleForTesting | ||
|
@@ -184,23 +186,38 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri | |
|
||
this.clusterResourceManager = resourceManager; | ||
this.standbyContainerManager = Optional.empty(); | ||
|
||
this.diagnosticsManager = Option.empty(); | ||
this.containerAllocator = allocator.orElseGet( | ||
() -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state, | ||
hostAffinityEnabled, this.standbyContainerManager)); | ||
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); | ||
if (shouldStartAllocateThread()) { | ||
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); | ||
} | ||
LOG.info("Finished container process manager initialization"); | ||
} | ||
|
||
// In Kubernetes, the pod will be started by kubelet automatically once it is allocated, it does not need a | ||
// separate thread to keep polling the allocated resources to start the container. | ||
public boolean shouldStartAllocateThread() { | ||
return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This lookup of specific cluster manager seems pretty hard to maintain. Can we think of a better to distinguish when we need this to do container allocation? |
||
} | ||
|
||
public boolean shouldShutdown() { | ||
LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}, Is allocator thread alive: {}", | ||
state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no"); | ||
LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}", | ||
state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no"); | ||
|
||
if (shouldStartAllocateThread()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does this if do? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This If is just for logging the aliveness info "Is allocator thread alive". The RB has been updated. The aliveness info is now in the LOG.debug() in line 207 (keep the code same as the current codebase). |
||
LOG.debug("Is allocator thread alive: {}", allocatorThread.isAlive() ? "yes" : "no"); | ||
} | ||
|
||
if (exceptionOccurred != null) { | ||
LOG.error("Exception in container process manager", exceptionOccurred); | ||
throw new SamzaException(exceptionOccurred); | ||
} | ||
return jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get() || !allocatorThread.isAlive(); | ||
|
||
boolean shouldShutdown = jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get(); | ||
return shouldStartAllocateThread() ? shouldShutdown || !allocatorThread.isAlive() : shouldShutdown; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess it's nicer to do a null check of the thread instead of doing shouldStartAllocateThread() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right. updated. |
||
} | ||
|
||
public void start() { | ||
|
@@ -237,7 +254,9 @@ public void start() { | |
|
||
// Start container allocator thread | ||
LOG.info("Starting the container allocator thread"); | ||
allocatorThread.start(); | ||
if (allocatorThread != null) { | ||
allocatorThread.start(); | ||
} | ||
LOG.info("Starting the container process manager"); | ||
} | ||
|
||
|
@@ -246,12 +265,14 @@ public void stop() { | |
|
||
// Shutdown allocator thread | ||
containerAllocator.stop(); | ||
try { | ||
allocatorThread.join(); | ||
LOG.info("Stopped container allocator"); | ||
} catch (InterruptedException ie) { | ||
LOG.error("Allocator thread join threw an interrupted exception", ie); | ||
Thread.currentThread().interrupt(); | ||
if (allocatorThread != null) { | ||
try { | ||
allocatorThread.join(); | ||
LOG.info("Stopped container allocator"); | ||
} catch (InterruptedException ie) { | ||
LOG.error("Allocator thread join threw an interrupted exception", ie); | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
|
||
if (diagnosticsManager.isDefined()) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,34 +23,22 @@ import java.util | |
import java.util.concurrent.atomic.AtomicReference | ||
|
||
import org.apache.samza.{Partition, SamzaException} | ||
import org.apache.samza.config._ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems all the changes in this file is just to reorg imports. If that's the case, I think this file can be left untouched. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The RB has been updated and aligned with the master branch. The change here has gone. |
||
import org.apache.samza.config.Config | ||
import org.apache.samza.container.grouper.stream.SSPGrouperProxy | ||
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory | ||
import org.apache.samza.config.{Config, _} | ||
import org.apache.samza.container.grouper.stream.{SSPGrouperProxy, SystemStreamPartitionGrouperFactory} | ||
import org.apache.samza.container.grouper.task._ | ||
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore | ||
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping | ||
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping | ||
import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping | ||
import org.apache.samza.container.LocalityManager | ||
import org.apache.samza.container.TaskName | ||
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore | ||
import org.apache.samza.coordinator.server.HttpServer | ||
import org.apache.samza.coordinator.server.JobServlet | ||
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping | ||
import org.apache.samza.job.model.ContainerModel | ||
import org.apache.samza.job.model.JobModel | ||
import org.apache.samza.job.model.TaskMode | ||
import org.apache.samza.job.model.TaskModel | ||
import org.apache.samza.metrics.MetricsRegistry | ||
import org.apache.samza.metrics.MetricsRegistryMap | ||
import org.apache.samza.container.{LocalityManager, TaskName} | ||
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore} | ||
import org.apache.samza.coordinator.server.{HttpServer, JobServlet} | ||
import org.apache.samza.coordinator.stream.messages.{SetContainerHostMapping, SetTaskContainerMapping, SetTaskModeMapping, SetTaskPartitionMapping} | ||
import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, TaskModel} | ||
import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap} | ||
import org.apache.samza.runtime.LocationId | ||
import org.apache.samza.system._ | ||
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals | ||
import org.apache.samza.util.{Logging, ReflectionUtil, Util} | ||
|
||
import scala.collection.JavaConverters | ||
import scala.collection.JavaConversions._ | ||
import scala.collection.JavaConverters | ||
import scala.collection.JavaConverters._ | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
# samzaJarsFolder includes all the Samza jars (you needs to make sure all the samza jars are there.) | ||
# You can build Samza image by: | ||
# docker build -t dockerHubAccount/samza:versionNumber . | ||
# Then Samza user can use the Samza image as base image to build their application image. | ||
# | ||
|
||
FROM ubuntu:latest | ||
|
||
RUN apt-get update -y && apt-get upgrade -y && apt-get install -y openjdk-8-jdk | ||
|
||
ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64 | ||
ENV PATH $PATH:$JAVA_HOME/bin | ||
|
||
RUN mkdir -p /opt/samza | ||
WORKDIR /opt/samza/ | ||
COPY samzaJarsFolder/ /opt/samza/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
# Configurations | ||
- kube.app.image: the image name for the samza app | ||
- kube.app.namespace: the namespace where the samza app runs | ||
- kube.app.pod.mnt.path: the path where the remote volume is mounted into the pod, for both the job coordinator pod and stream processor pod | ||
the volume can be used for storing logs and local states. | ||
- cluster-manager.container.memory.mb: the memory size for the samza stream processor | ||
- cluster-manager.container.cpu.cores: the cpu cores for the samza stream processor |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.samza.config; | ||
|
||
/** | ||
* Kubernetes related configurations | ||
*/ | ||
public class KubeConfig { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest following the other Config class example to add getters for common configs, like namespace, jcPodName, etc. It's easier to centralize the place where these things are created. The rest of the code should just use these getters normally instead of directly accessing the config vars. It will be even better to make those static vars private. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good. will update this class. |
||
|
||
// the image name of samza | ||
public static final String APP_IMAGE = "kube.app.image"; | ||
public static final String DEFAULT_IMAGE = "weiqingyang/samza:v0"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably not put your personal id as the default config. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right. Updated it to "samza/samza:v0". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will create samza docker hub if needed. |
||
|
||
// the default working directory | ||
public static final String DEFAULT_DIRECTORY = "/opt/samza/"; | ||
|
||
// the memory and the cpu cores of container | ||
public static final String CLUSTER_MANAGER_CONTAINER_MEM_SIZE = "cluster-manager.container.memory.mb"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems these configs should already be defined in the cluster manager config. If not, can we make sure it's shared? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RB was updated to remove these duplicate config definitions. |
||
public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_MEM_SIZE = 1024; | ||
public static final String CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM = "cluster-manager.container.cpu.cores"; | ||
public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM = 1; | ||
|
||
// The directory path inside which the log will be stored. | ||
public static final String SAMZA_MOUNT_DIR = "kube.app.pod.mnt.path"; | ||
public static final String DEFAULT_SAMZA_MOUNT_DIR = "/tmp/mnt"; | ||
public static final String K8S_API_NAMESPACE = "kube.app.namespace"; | ||
public static final String STREAM_PROCESSOR_CONTAINER_NAME_PREFIX = "sp"; | ||
public static final String JC_CONTAINER_NAME_PREFIX = "jc"; | ||
public static final String POD_RESTART_POLICY = "OnFailure"; | ||
public static final String JC_POD_NAME_FORMAT = "%s-%s-%s"; // jc-appName-appId | ||
public static final String TASK_POD_NAME_FORMAT = "%s-%s-%s-%s"; // sp-appName-appId-containerId | ||
|
||
// Environment variable | ||
public static final String COORDINATOR_POD_NAME = "COORDINATOR_POD_NAME"; | ||
public static final String AZURE_REMOTE_VOLUME_ENABLED = "kube.app.volume.azure.file-share.enabled"; | ||
public static final String AZURE_SECRET = "kube.app.volume.azure-secret"; | ||
public static final String DEFAULT_AZURE_SECRET = "azure-secret"; | ||
public static final String AZURE_FILESHARE = "kube.app.volume.azure.file-share"; | ||
public static final String DEFAULT_AZURE_FILESHARE = "aksshare"; | ||
|
||
private Config config; | ||
|
||
public KubeConfig(Config config) { | ||
this.config = config; | ||
} | ||
|
||
public static KubeConfig validate(Config config) throws ConfigException { | ||
KubeConfig kc = new KubeConfig(config); | ||
kc.validate(); | ||
return kc; | ||
} | ||
|
||
// TODO: SAMZA-2365: validate KubeConfig before starting the job | ||
private void validate() throws ConfigException { | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason we do this replacement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RB has been updated and aligned with the master branch. The change here has gone.