Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2067: Support Samza's running on Kubernetes #1197

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,27 @@ project(":samza-yarn_$scalaSuffix") {
jar.dependsOn("lesscss")
}

project(":samza-kubernetes_$scalaSuffix") {
apply plugin: 'java'

dependencies {
compile project(':samza-api')
compile project(":samza-core_$scalaSuffix")
compile "org.codehaus.jackson:jackson-core-asl:1.9.7"
compile group: 'io.fabric8', name: 'kubernetes-client', version: kubernetesJavaClientVersion
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
}

tasks.create(name: "releaseKubeTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
into "samza-kubernetes-${version}"
compression = Compression.GZIP
from(configurations.runtime) { into("lib/") }
from(configurations.archives.artifacts.files) { into("lib/") }
duplicatesStrategy 'exclude'
}
}

project(":samza-shell") {
apply plugin: 'java'

Expand Down
1 change: 1 addition & 0 deletions gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@
jnaVersion = "4.5.1"
couchbaseClientVersion = "2.7.2"
couchbaseMockVersion = "1.5.22"
kubernetesJavaClientVersion = "4.1.3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ public static void main(String[] args) {
//Read and parse the coordinator system config.
LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
coordinatorSystemConfig =
new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
new MapConfig(SamzaObjectMapper.getObjectMapper()
.readValue(coordinatorSystemEnv.replace("\\\"", "\""), Config.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we do this replacement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RB has been updated and aligned with the master branch. The change here has gone.

LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig);
} catch (IOException e) {
LOG.error("Exception while reading coordinator stream config", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
* The Allocator matches requests to resources and executes processes.
*/
private final ContainerAllocator containerAllocator;
private final Thread allocatorThread;
private Thread allocatorThread = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please keep this final and assign to different values in the constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. The RB was updated, and the change here has gone.


// The StandbyContainerManager manages standby-aware allocation and failover of containers
private final Optional<StandbyContainerManager> standbyContainerManager;
Expand Down Expand Up @@ -166,8 +166,10 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri
}

this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.standbyContainerManager);
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
LOG.info("Finished container process manager initialization.");
if (shouldStartAllocateThread()) {
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do actually use containerAllocator anymore? Please walk through this part of the code with me offline.

}
LOG.info("finished initialization of samza task manager");
}

@VisibleForTesting
Expand All @@ -184,23 +186,38 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri

this.clusterResourceManager = resourceManager;
this.standbyContainerManager = Optional.empty();

this.diagnosticsManager = Option.empty();
this.containerAllocator = allocator.orElseGet(
() -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state,
hostAffinityEnabled, this.standbyContainerManager));
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
if (shouldStartAllocateThread()) {
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
}
LOG.info("Finished container process manager initialization");
}

// In Kubernetes, the pod will be started by kubelet automatically once it is allocated, it does not need a
// separate thread to keep polling the allocated resources to start the container.
public boolean shouldStartAllocateThread() {
return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lookup of specific cluster manager seems pretty hard to maintain. Can we think of a better to distinguish when we need this to do container allocation?

}

public boolean shouldShutdown() {
LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}, Is allocator thread alive: {}",
state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no");
LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}",
state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no");

if (shouldStartAllocateThread()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this if do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This If is just for logging the aliveness info "Is allocator thread alive". The RB has been updated. The aliveness info is now in the LOG.debug() in line 207 (keep the code same as the current codebase).

LOG.debug("Is allocator thread alive: {}", allocatorThread.isAlive() ? "yes" : "no");
}

if (exceptionOccurred != null) {
LOG.error("Exception in container process manager", exceptionOccurred);
throw new SamzaException(exceptionOccurred);
}
return jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get() || !allocatorThread.isAlive();

boolean shouldShutdown = jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get();
return shouldStartAllocateThread() ? shouldShutdown || !allocatorThread.isAlive() : shouldShutdown;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's nicer to do a null check of the thread instead of doing shouldStartAllocateThread()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. updated.

}

public void start() {
Expand Down Expand Up @@ -237,7 +254,9 @@ public void start() {

// Start container allocator thread
LOG.info("Starting the container allocator thread");
allocatorThread.start();
if (allocatorThread != null) {
allocatorThread.start();
}
LOG.info("Starting the container process manager");
}

Expand All @@ -246,12 +265,14 @@ public void stop() {

// Shutdown allocator thread
containerAllocator.stop();
try {
allocatorThread.join();
LOG.info("Stopped container allocator");
} catch (InterruptedException ie) {
LOG.error("Allocator thread join threw an interrupted exception", ie);
Thread.currentThread().interrupt();
if (allocatorThread != null) {
try {
allocatorThread.join();
LOG.info("Stopped container allocator");
} catch (InterruptedException ie) {
LOG.error("Allocator thread join threw an interrupted exception", ie);
Thread.currentThread().interrupt();
}
}

if (diagnosticsManager.isDefined()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,22 @@ import java.util
import java.util.concurrent.atomic.AtomicReference

import org.apache.samza.{Partition, SamzaException}
import org.apache.samza.config._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems all the changes in this file is just to reorg imports. If that's the case, I think this file can be left untouched.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RB has been updated and aligned with the master branch. The change here has gone.

import org.apache.samza.config.Config
import org.apache.samza.container.grouper.stream.SSPGrouperProxy
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
import org.apache.samza.config.{Config, _}
import org.apache.samza.container.grouper.stream.{SSPGrouperProxy, SystemStreamPartitionGrouperFactory}
import org.apache.samza.container.grouper.task._
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping
import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
import org.apache.samza.container.LocalityManager
import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
import org.apache.samza.coordinator.server.HttpServer
import org.apache.samza.coordinator.server.JobServlet
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.TaskMode
import org.apache.samza.job.model.TaskModel
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.container.{LocalityManager, TaskName}
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
import org.apache.samza.coordinator.stream.messages.{SetContainerHostMapping, SetTaskContainerMapping, SetTaskModeMapping, SetTaskPartitionMapping}
import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, TaskModel}
import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
import org.apache.samza.runtime.LocationId
import org.apache.samza.system._
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{Logging, ReflectionUtil, Util}

import scala.collection.JavaConverters
import scala.collection.JavaConversions._
import scala.collection.JavaConverters
import scala.collection.JavaConverters._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class HttpServer(
}

/**
* Returns the URL for the root of the HTTP server. This method
* Returns the URL for the root of the HTTP server. This URL is generated with host name.
*/
def getUrl = {
if (running) {
Expand All @@ -136,4 +136,17 @@ class HttpServer(
throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.")
}
}

/**
* Returns the URL for the root of the HTTP server. This URL is generated with host address.
*/
def getIpUrl = {
if (running) {
val runningPort = server.getConnectors()(0).asInstanceOf[NetworkConnector].getLocalPort()

new URL("http://" + Util.getLocalHost.getHostAddress + ":" + runningPort + rootPath)
} else {
throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object HttpUtil {
(exception, loop) => {
exception match {
case ioe: IOException => {
warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
error("Error getting response from Job coordinator server. Received IOException: %s. Retrying..." format ioe)
httpConn = getHttpConnection(url, timeout)
}
case e: Exception =>
Expand Down
34 changes: 34 additions & 0 deletions samza-kubernetes/src/docker/dockerfiles/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# samzaJarsFolder includes all the Samza jars (you needs to make sure all the samza jars are there.)
# You can build Samza image by:
# docker build -t dockerHubAccount/samza:versionNumber .
# Then Samza user can use the Samza image as base image to build their application image.
#

FROM ubuntu:latest

RUN apt-get update -y && apt-get upgrade -y && apt-get install -y openjdk-8-jdk

ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64
ENV PATH $PATH:$JAVA_HOME/bin

RUN mkdir -p /opt/samza
WORKDIR /opt/samza/
COPY samzaJarsFolder/ /opt/samza/
26 changes: 26 additions & 0 deletions samza-kubernetes/src/main/java/org/apache/samza/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

# Configurations
- kube.app.image: the image name for the samza app
- kube.app.namespace: the namespace where the samza app runs
- kube.app.pod.mnt.path: the path where the remote volume is mounted into the pod, for both the job coordinator pod and stream processor pod
the volume can be used for storing logs and local states.
- cluster-manager.container.memory.mb: the memory size for the samza stream processor
- cluster-manager.container.cpu.cores: the cpu cores for the samza stream processor
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.config;

/**
* Kubernetes related configurations
*/
public class KubeConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest following the other Config class example to add getters for common configs, like namespace, jcPodName, etc. It's easier to centralize the place where these things are created. The rest of the code should just use these getters normally instead of directly accessing the config vars. It will be even better to make those static vars private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. will update this class.


// the image name of samza
public static final String APP_IMAGE = "kube.app.image";
public static final String DEFAULT_IMAGE = "weiqingyang/samza:v0";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not put your personal id as the default config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. Updated it to "samza/samza:v0".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will create samza docker hub if needed.


// the default working directory
public static final String DEFAULT_DIRECTORY = "/opt/samza/";

// the memory and the cpu cores of container
public static final String CLUSTER_MANAGER_CONTAINER_MEM_SIZE = "cluster-manager.container.memory.mb";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems these configs should already be defined in the cluster manager config. If not, can we make sure it's shared?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RB was updated to remove these duplicate config definitions.

public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_MEM_SIZE = 1024;
public static final String CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM = "cluster-manager.container.cpu.cores";
public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM = 1;

// The directory path inside which the log will be stored.
public static final String SAMZA_MOUNT_DIR = "kube.app.pod.mnt.path";
public static final String DEFAULT_SAMZA_MOUNT_DIR = "/tmp/mnt";
public static final String K8S_API_NAMESPACE = "kube.app.namespace";
public static final String STREAM_PROCESSOR_CONTAINER_NAME_PREFIX = "sp";
public static final String JC_CONTAINER_NAME_PREFIX = "jc";
public static final String POD_RESTART_POLICY = "OnFailure";
public static final String JC_POD_NAME_FORMAT = "%s-%s-%s"; // jc-appName-appId
public static final String TASK_POD_NAME_FORMAT = "%s-%s-%s-%s"; // sp-appName-appId-containerId

// Environment variable
public static final String COORDINATOR_POD_NAME = "COORDINATOR_POD_NAME";
public static final String AZURE_REMOTE_VOLUME_ENABLED = "kube.app.volume.azure.file-share.enabled";
public static final String AZURE_SECRET = "kube.app.volume.azure-secret";
public static final String DEFAULT_AZURE_SECRET = "azure-secret";
public static final String AZURE_FILESHARE = "kube.app.volume.azure.file-share";
public static final String DEFAULT_AZURE_FILESHARE = "aksshare";

private Config config;

public KubeConfig(Config config) {
this.config = config;
}

public static KubeConfig validate(Config config) throws ConfigException {
KubeConfig kc = new KubeConfig(config);
kc.validate();
return kc;
}

// TODO: SAMZA-2365: validate KubeConfig before starting the job
private void validate() throws ConfigException {

}
}
Loading