Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Weiqing Yang committed Oct 19, 2019
1 parent 232534b commit 2036e37
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 12 deletions.
6 changes: 3 additions & 3 deletions samza-kubernetes/src/main/java/org/apache/samza/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
*/

# Configurations
- kube.app.image : the image name for the samza app
- kube.app.namespace : the namespace where the samza app run
- kube.app.pod.mnt.path: the path where the remote volume is mounted into the pod, both for the job coordinator pod and stream processor pod
- kube.app.image: the image name for the samza app
- kube.app.namespace: the namespace where the samza app runs
- kube.app.pod.mnt.path: the path where the remote volume is mounted into the pod, for both the job coordinator pod and stream processor pod
the volume can be used for storing logs and local states.
- cluster-manager.container.memory.mb: the memory size for the samza stream processor
- cluster-manager.container.cpu.cores: the cpu cores for the samza stream processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@


public class KubeClientFactory {
private static final Logger log = LoggerFactory.getLogger(KubeClusterResourceManager.class);
private static final Logger LOG = LoggerFactory.getLogger(KubeClientFactory.class);

public static KubernetesClient create() {
log.info("Creating an instance of a Kubernetes client. ");
LOG.info("Creating an instance of a Kubernetes client. ");
ConfigBuilder builder = new ConfigBuilder();
Config config = builder.build();
KubernetesClient client = new DefaultKubernetesClient(config);
log.info("Kubernetes client created. ");
LOG.info("Kubernetes client created. ");

return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;

import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.*;
import org.apache.samza.config.ClusterManagerConfig;
Expand All @@ -45,8 +44,6 @@

public class KubeClusterResourceManager extends ClusterResourceManager {
private static final Logger LOG = LoggerFactory.getLogger(KubeClusterResourceManager.class);

private final Object lock = new Object();
private final Map<String, String> podLabels = new HashMap<>();
private KubernetesClient client;
private String appId;
Expand All @@ -55,7 +52,7 @@ public class KubeClusterResourceManager extends ClusterResourceManager {
private String namespace;
private OwnerReference ownerReference;
private JobModelManager jobModelManager;
private boolean hostAffinityEnabled = false;
private boolean hostAffinityEnabled;
private Config config;
private String jcPodName;

Expand Down Expand Up @@ -83,7 +80,6 @@ public void start() {

// Create the owner reference of the samaza-operator pod
private void createOwnerReferences() {

// The operator pod yaml needs to pass in COORDINATOR_POD_NAME env
this.jcPodName = System.getenv(COORDINATOR_POD_NAME);
LOG.info("job coordinator pod name is: {}, namespace is: {}", jcPodName, namespace);
Expand Down Expand Up @@ -179,7 +175,7 @@ public void requestResources(SamzaResourceRequest resourceRequest) {
container.setEnv(getEnvs(builder));
String podName = String.format(TASK_POD_NAME_FORMAT, STREAM_PROCESSOR_CONTAINER_NAME_PREFIX, appName, appId, samzaContainerId);

//TODO pluggable volume implementation hostpath, azure file
//TODO: pluggable volume implementation hostpath, azure file
AzureFileVolumeSource azureFileVolumeSource =
new AzureFileVolumeSource(false, "azure-secret", "aksshare");
Volume volume = new Volume();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.job.kubernetes;

import org.junit.Test;

// TODO
public class KubeJobTest {

@Test
public void testFileSystemImplConfigSuccess() {

}
}

0 comments on commit 2036e37

Please sign in to comment.