diff --git a/samza-kubernetes/src/main/java/org/apache/samza/README.md b/samza-kubernetes/src/main/java/org/apache/samza/README.md index 0503eb6333..b9f40b823a 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/README.md +++ b/samza-kubernetes/src/main/java/org/apache/samza/README.md @@ -18,9 +18,9 @@ */ # Configurations -- kube.app.image : the image name for the samza app -- kube.app.namespace : the namespace where the samza app run -- kube.app.pod.mnt.path: the path where the remote volume is mounted into the pod, both for the job coordinator pod and stream processor pod +- kube.app.image: the image name for the samza app +- kube.app.namespace: the namespace where the samza app runs +- kube.app.pod.mnt.path: the path where the remote volume is mounted into the pod, for both the job coordinator pod and stream processor pod the volume can be used for storing logs and local states. - cluster-manager.container.memory.mb: the memory size for the samza stream processor - cluster-manager.container.cpu.cores: the cpu cores for the samza stream processor diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java index 229a6e7df7..0825270620 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java @@ -28,14 +28,14 @@ public class KubeClientFactory { - private static final Logger log = LoggerFactory.getLogger(KubeClusterResourceManager.class); + private static final Logger LOG = LoggerFactory.getLogger(KubeClientFactory.class); public static KubernetesClient create() { - log.info("Creating an instance of a Kubernetes client. "); + LOG.info("Creating an instance of a Kubernetes client. "); ConfigBuilder builder = new ConfigBuilder(); Config config = builder.build(); KubernetesClient client = new DefaultKubernetesClient(config); - log.info("Kubernetes client created. "); + LOG.info("Kubernetes client created. "); return client; } diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java index 6605f17f04..1c8b06c664 100644 --- a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -25,7 +25,6 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.*; - import org.apache.samza.SamzaException; import org.apache.samza.clustermanager.*; import org.apache.samza.config.ClusterManagerConfig; @@ -45,8 +44,6 @@ public class KubeClusterResourceManager extends ClusterResourceManager { private static final Logger LOG = LoggerFactory.getLogger(KubeClusterResourceManager.class); - - private final Object lock = new Object(); private final Map podLabels = new HashMap<>(); private KubernetesClient client; private String appId; @@ -55,7 +52,7 @@ public class KubeClusterResourceManager extends ClusterResourceManager { private String namespace; private OwnerReference ownerReference; private JobModelManager jobModelManager; - private boolean hostAffinityEnabled = false; + private boolean hostAffinityEnabled; private Config config; private String jcPodName; @@ -83,7 +80,6 @@ public void start() { // Create the owner reference of the samaza-operator pod private void createOwnerReferences() { - // The operator pod yaml needs to pass in COORDINATOR_POD_NAME env this.jcPodName = System.getenv(COORDINATOR_POD_NAME); LOG.info("job coordinator pod name is: {}, namespace is: {}", jcPodName, namespace); @@ -179,7 +175,7 @@ public void requestResources(SamzaResourceRequest resourceRequest) { container.setEnv(getEnvs(builder)); String podName = String.format(TASK_POD_NAME_FORMAT, STREAM_PROCESSOR_CONTAINER_NAME_PREFIX, appName, appId, samzaContainerId); - //TODO pluggable volume implementation hostpath, azure file + //TODO: pluggable volume implementation hostpath, azure file AzureFileVolumeSource azureFileVolumeSource = new AzureFileVolumeSource(false, "azure-secret", "aksshare"); Volume volume = new Volume(); diff --git a/samza-kubernetes/src/test/java/org/apache/samza/job/kubernetes/KubeJobTest.java b/samza-kubernetes/src/test/java/org/apache/samza/job/kubernetes/KubeJobTest.java new file mode 100644 index 0000000000..a46f22de4e --- /dev/null +++ b/samza-kubernetes/src/test/java/org/apache/samza/job/kubernetes/KubeJobTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import org.junit.Test; + +// TODO +public class KubeJobTest { + + @Test + public void testFileSystemImplConfigSuccess() { + + } +}