Skip to content

Commit

Permalink
Support Samza on K8s
Browse files Browse the repository at this point in the history
Co-authored-by: Jian He <[email protected]>
  • Loading branch information
Weiqing Yang committed Oct 20, 2019
1 parent 3e3713c commit ec9a40c
Show file tree
Hide file tree
Showing 28 changed files with 1,510 additions and 55 deletions.
21 changes: 21 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,27 @@ project(":samza-yarn_$scalaSuffix") {
jar.dependsOn("lesscss")
}

project(":samza-kubernetes_$scalaSuffix") {
apply plugin: 'java'

dependencies {
compile project(':samza-api')
compile project(":samza-core_$scalaSuffix")
compile "org.codehaus.jackson:jackson-core-asl:1.9.7"
compile group: 'io.fabric8', name: 'kubernetes-client', version: kubernetesJavaClientVersion
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
}

tasks.create(name: "releaseKubeTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
into "samza-kubernetes-${version}"
compression = Compression.GZIP
from(configurations.runtime) { into("lib/") }
from(configurations.archives.artifacts.files) { into("lib/") }
duplicatesStrategy 'exclude'
}
}

project(":samza-shell") {
apply plugin: 'java'

Expand Down
1 change: 1 addition & 0 deletions gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@
failsafeVersion = "1.1.0"
jlineVersion = "3.8.2"
jnaVersion = "4.5.1"
kubernetesJavaClientVersion = "4.1.3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.samza.clustermanager;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -380,19 +379,53 @@ StreamPartitionCountMonitor getPartitionMonitor() {
* @param args args
*/
public static void main(String[] args) {
// TODO: remove all added code used for debugging
Thread thread = new Thread() {
public void run() {
log.info("Dummy Thread starts to sleep");
System.out.println("Dummy Thread starts to sleep");
while (true) {
try {
sleep(8 * 1000 * 60 * 60 * 60);
} catch (Exception e) {
log.info("Dummy Thread was interrupted");
System.out.println("Dummy Thread was interrupted");
}
}
}
};
thread.start();

Config coordinatorSystemConfig = null;
final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
try {
//Read and parse the coordinator system config.
log.info("Parsing coordinator system config {}", coordinatorSystemEnv);
System.out.println("Coordinator system config: " + coordinatorSystemEnv);
String correctedCoordinatorSystemEnv = coordinatorSystemEnv.replace("\\\"", "\"");
log.info("Corrected coordinator system config {}", correctedCoordinatorSystemEnv);
System.out.println("Corrected coordinator system config: " + correctedCoordinatorSystemEnv);

coordinatorSystemConfig =
new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
} catch (IOException e) {
new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(correctedCoordinatorSystemEnv, Config.class));
} catch (Exception e) {
log.error("Exception while reading coordinator stream config {}", e);
throw new SamzaException(e);

log.error("Exception ignored: ", e);
System.out.println("Exception ignored: " + e);
// throw new SamzaException(e);
}

ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig);
jc.run();

try {
thread.join();
} catch (Exception e) {
log.error("new thread ended", e);
System.out.println("new thread ended: " + e);
}

log.info("Finished ClusterBasedJobCoordinator run");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
* The Allocator matches requests to resources and executes processes.
*/
private final AbstractContainerAllocator containerAllocator;
private final Thread allocatorThread;
private Thread allocatorThread = null;

// The StandbyContainerManager manages standby-aware allocation and failover of containers
private final Optional<StandbyContainerManager> standbyContainerManager;
Expand Down Expand Up @@ -146,8 +146,9 @@ public ContainerProcessManager(Config config,
} else {
this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
}

this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
if (shouldStartAllocateThread()) {
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
}
log.info("finished initialization of samza task manager");

}
Expand All @@ -174,19 +175,31 @@ public ContainerProcessManager(Config config,
this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
}

this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
if (shouldStartAllocateThread()) {
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
}
log.info("finished initialization of samza task manager");
}

// In Kubernetes, the pod requested will be started by kubelet automatically once it is assigned, it does not need a
// separate thread to keep polling the allocated resources to start the container.
public boolean shouldStartAllocateThread() {
return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager");
}

public boolean shouldShutdown() {
log.debug(" TaskManager state: Completed containers: {}, Configured containers: {}, Is there too many FailedContainers: {}, Is AllocatorThread alive: {} ",
state.completedContainers.get(), state.containerCount, tooManyFailedContainers ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no");
// TODO:
// log.debug(" TaskManager state: Completed containers: {}, Configured containers: {}, Is there too many FailedContainers: {}, Is AllocatorThread alive: {} ",
// state.completedContainers.get(), state.containerCount, tooManyFailedContainers ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no");

if (exceptionOccurred != null) {
log.error("Exception in ContainerProcessManager", exceptionOccurred);
throw new SamzaException(exceptionOccurred);
}
return tooManyFailedContainers || state.completedContainers.get() == state.containerCount.get() || !allocatorThread.isAlive();

// TODO:
// return tooManyFailedContainers || state.completedContainers.get() == state.containerCount.get() || !allocatorThread.isAlive();
return tooManyFailedContainers || state.completedContainers.get() == state.containerCount.get();
}

public void start() {
Expand All @@ -206,20 +219,24 @@ public void start() {

// Start container allocator thread
log.info("Starting the container allocator thread");
allocatorThread.start();
if (allocatorThread != null) {
allocatorThread.start();
}
}

public void stop() {
log.info("Invoked stop of the Samza container process manager");

// Shutdown allocator thread
containerAllocator.stop();
try {
allocatorThread.join();
log.info("Stopped container allocator");
} catch (InterruptedException ie) {
log.error("Allocator Thread join() threw an interrupted exception", ie);
Thread.currentThread().interrupt();
if (allocatorThread != null) {
try {
allocatorThread.join();
log.info("Stopped container allocator");
} catch (InterruptedException ie) {
log.error("Allocator Thread join() threw an interrupted exception", ie);
Thread.currentThread().interrupt();
}
}

if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.samza.runtime;

import org.apache.samza.SamzaException;
import org.apache.samza.application.ApplicationUtil;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
Expand All @@ -41,37 +40,64 @@
public class LocalContainerRunner {
private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);

// TODO: remove all added code used for debugging
public static void main(String[] args) throws Exception {
Thread thread = new Thread() {
public void run() {
log.info("Dummy Thread starts to sleep");
System.out.println("Dummy Thread starts to sleep");
while (true) {
try {
sleep(8 * 1000 * 60 * 60 * 60);
} catch (Exception e) {
log.info("Dummy Thread was interrupted");
System.out.println("Dummy Thread was interrupted");
}
}
}
};
thread.start();

Thread.setDefaultUncaughtExceptionHandler(
new SamzaUncaughtExceptionHandler(() -> {
log.info("Exiting process now.");
System.exit(1);
}));

String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
log.info(String.format("Got container ID: %s", containerId));
System.out.println(String.format("Container ID: %s", containerId));
try {
String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
log.info(String.format("Got container ID: %s", containerId));
System.out.println(String.format("Container ID: %s", containerId));

String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));

int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
Config config = jobModel.getConfig();
JobConfig jobConfig = new JobConfig(config);
if (jobConfig.getName().isEmpty()) {
throw new SamzaException("can not find the job name");
}
String jobName = jobConfig.getName().get();
String jobId = jobConfig.getJobId();
MDC.put("containerName", "samza-container-" + containerId);
MDC.put("jobName", jobName);
MDC.put("jobId", jobId);
int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
Config config = jobModel.getConfig();
JobConfig jobConfig = new JobConfig(config);
if (jobConfig.getName().isEmpty()) {
// throw new SamzaException("can not find the job name");
log.error("can not find the job name");
System.out.println("can not find the job name");
}
String jobName = jobConfig.getName().get();
String jobId = jobConfig.getJobId();
MDC.put("containerName", "samza-container-" + containerId);
MDC.put("jobName", jobName);
MDC.put("jobId", jobId);

ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);

ContainerLaunchUtil.run(appDesc, containerId, jobModel);
} catch (Exception ex) {
// ignored.
log.error("LocalContainerRunner throw exception: ", ex);
System.out.println("LocalContainerRunner throw exception: " + ex);
}

ContainerLaunchUtil.run(appDesc, containerId, jobModel);
thread.join();
}
}
Loading

0 comments on commit ec9a40c

Please sign in to comment.