Skip to content

Commit

Permalink
check credFilePath
Browse files Browse the repository at this point in the history
  • Loading branch information
anouri committed Oct 2, 2019
1 parent 6fb45b8 commit 6c4e44e
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 97 deletions.
64 changes: 33 additions & 31 deletions com.ibm.streamsx.hdfs/.classpath
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,38 @@
<classpathentry kind="src" output="impl/java/bin" path="impl/java/src"/>
<classpathentry exported="true" kind="con" path="com.ibm.streams.java/com.ibm.streams.operator"/>
<classpathentry exported="true" kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="impl/lib/ext/commons-cli-1.4.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/commons-codec-1.12.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/commons-collections-3.2.2.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/commons-configuration2-2.5.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/commons-httpclient-3.1.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/commons-io-2.6.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/commons-lang-2.6.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/commons-lang3-3.9.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/commons-logging-1.2.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/guava-13.0.1.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/hadoop-annotations-3.1.0.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/hadoop-auth-3.1.0.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/hadoop-common-3.1.0.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/hadoop-hdfs-3.1.0.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/hadoop-hdfs-client-3.1.0.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/htrace-core4-4.2.0-incubating.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/httpcore-4.4.11.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/jackson-core-2.9.9.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/jackson-core-asl-1.9.13.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/jackson-databind-2.9.9.3.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/jackson-mapper-asl-1.9.13.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/jersey-core-1.19.4.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/jersey-server-1.19.4.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/jsr311-api-1.1.1.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/protobuf-java-3.6.1.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/re2j-1.3.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/servlet-api-2.5.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/slf4j-api-1.7.26.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/slf4j-log4j12-1.7.26.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/stax2-api-4.2.jar"/>
<classpathentry kind="lib" path="impl/lib/ext/woodstox-core-5.0.3.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/commons-cli-1.4.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/commons-codec-1.12.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/commons-collections-3.2.2.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/commons-configuration2-2.5.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/commons-httpclient-3.1.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/commons-io-2.6.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/commons-lang-2.6.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/commons-lang3-3.9.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/commons-logging-1.2.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/guava-13.0.1.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/hadoop-annotations-3.1.0.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/hadoop-auth-3.1.0.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/hadoop-common-3.1.0.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/hadoop-hdfs-3.1.0.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/hadoop-hdfs-client-3.1.0.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/htrace-core4-4.2.0-incubating.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/httpcore-4.4.11.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/jackson-core-2.9.9.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/jackson-core-asl-1.9.13.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/jackson-databind-2.9.9.3.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/jackson-mapper-asl-1.9.13.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/jersey-core-1.19.4.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/jersey-server-1.19.4.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/jsr311-api-1.1.1.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/protobuf-java-3.6.1.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/re2j-1.3.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/servlet-api-2.5.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/slf4j-api-1.7.26.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/slf4j-log4j12-1.7.26.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/stax2-api-4.2.jar"/>
<classpathentry kind="lib" path="./impl/lib/ext/woodstox-core-5.0.3.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>


Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -18,6 +20,9 @@
import org.apache.hadoop.fs.Path;

import com.ibm.json.java.JSONObject;
import com.ibm.json.java.JSONArray;
import com.ibm.json.java.*;

import com.ibm.streams.operator.AbstractOperator;
import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.OperatorContext.ContextCheck;
Expand Down Expand Up @@ -83,8 +88,17 @@ public synchronized void initialize(OperatorContext context) throws Exception {
setJavaSystemProperty();
loadAppConfig(context);
if (credentials != null) {
this.getCredentials(credentials);
if (!this.getCredentials(credentials)){
return;
}
}

if (fCredFile != null) {
if (!this.getCredentialsFormFile(fCredFile)){
return;
}
}

setupClassPaths(context);
addConfigPathToClassPaths(context);
createConnection();
Expand Down Expand Up @@ -132,13 +146,37 @@ protected void loadAppConfig(OperatorContext context) {
}
}


/**
* read the credentials from file and set fHdfsUser, fHdfsPassword and fHdfsUrl.
* @param credFile
*/
public boolean getCredentialsFormFile(String credFile) throws IOException {

String credentials = null;
try
{
credentials = new String ( Files.readAllBytes( Paths.get(getAbsolutePath(credFile)) ) );
}
catch (IOException e)
{
LOGGER.log(LogLevel.ERROR, "The credentials file " + getAbsolutePath(credFile) + "does not exist." );
return false;
}

if ((credentials != null ) && (!credentials.isEmpty())) {
return getCredentials(credentials);
}
return false;
}


/**
* read the credentials and set user name fHdfsUser, fHfsPassword and
* hdfsUrl.
* read the credentials and set fHdfsUser, fHfsPassword and fHdfsUrl.
*
* @param credentials
*/
public void getCredentials(String credentials) throws IOException {
public boolean getCredentials(String credentials) throws IOException {
String jsonString = credentials;
try {
JSONObject obj = JSONObject.parse(jsonString);
Expand Down Expand Up @@ -172,7 +210,9 @@ public void getCredentials(String credentials) throws IOException {

} catch (Exception ex) {
ex.printStackTrace();
return false;
}
return true;
}

/**
Expand Down Expand Up @@ -230,7 +270,6 @@ private void addConfigPathToClassPaths(OperatorContext context) {
String user_defined_config_path = getAbsolutePath(getConfigPath())+ "/*";
TRACE.log(TraceLevel.INFO, "Adding " + user_defined_config_path + " to classpath");
libList.add(user_defined_config_path);
System.out.println("AAAAA Adding " + user_defined_config_path + " to classpath");
}

for (int i = 0; i < libList.size(); i++) {
Expand Down Expand Up @@ -302,7 +341,7 @@ public static void checkParameters(OperatorContextChecker checker) {
}


@Parameter(name = "hdfsUri", optional = true, description = IHdfsConstants.DESC_HDFS_URL)
@Parameter(name = IHdfsConstants.PARAM_HDFS_URI, optional = true, description = IHdfsConstants.DESC_HDFS_URL)
public void setHdfsUri(String hdfsUri) {
TRACE.log(TraceLevel.DEBUG, "setHdfsUri: " + hdfsUri);
fHdfsUri = hdfsUri;
Expand All @@ -313,7 +352,7 @@ public String getHdfsUri() {
}


@Parameter(name = "hdfsUser", optional = true, description = IHdfsConstants.DESC_HDFS_USER)
@Parameter(name = IHdfsConstants.PARAM_HDFS_USER, optional = true, description = IHdfsConstants.DESC_HDFS_USER)
public void setHdfsUser(String hdfsUser) {
this.fHdfsUser = hdfsUser;
}
Expand All @@ -323,7 +362,7 @@ public String getHdfsUser() {
}


@Parameter(name = "hdfsPassword", optional = true, description = IHdfsConstants.DESC_HDFS_PASSWORD)
@Parameter(name = IHdfsConstants.PARAM_HDFS_PASSWORD, optional = true, description = IHdfsConstants.DESC_HDFS_PASSWORD)
public void setHdfsPassword(String hdfsPassword) {
fHdfsPassword = hdfsPassword;
}
Expand All @@ -334,7 +373,7 @@ public String getHdfsPassword() {


// Parameter reconnectionPolicy
@Parameter(name = "reconnectionPolicy", optional = true, description = IHdfsConstants.DESC_REC_POLICY)
@Parameter(name = IHdfsConstants.PARAM_REC_POLICY, optional = true, description = IHdfsConstants.DESC_REC_POLICY)
public void setReconnectionPolicy(String reconnectionPolicy) {
this.fReconnectionPolicy = reconnectionPolicy;
}
Expand All @@ -345,7 +384,7 @@ public String getReconnectionPolicy() {


// Parameter reconnectionBound
@Parameter(name = "reconnectionBound", optional = true, description = IHdfsConstants.DESC_REC_BOUND)
@Parameter(name = IHdfsConstants.PARAM_REC_BOUND, optional = true, description = IHdfsConstants.DESC_REC_BOUND)
public void setReconnectionBound(int reconnectionBound) {
this.fReconnectionBound = reconnectionBound;
}
Expand All @@ -355,7 +394,7 @@ public int getReconnectionBound() {
}

// Parameter reconnectionInterval
@Parameter(name = "reconnectionInterval", optional = true, description = IHdfsConstants.DESC_REC_INTERVAL)
@Parameter(name = IHdfsConstants.PARAM_REC_INTERVAL, optional = true, description = IHdfsConstants.DESC_REC_INTERVAL)
public void setReconnectionInterval(double reconnectionInterval) {
this.fReconnectionInterval = reconnectionInterval;
}
Expand All @@ -365,7 +404,7 @@ public double getReconnectionInterval() {
}

// Parameter authPrincipal
@Parameter(name = "authPrincipal", optional = true, description = IHdfsConstants.DESC_PRINCIPAL)
@Parameter(name = IHdfsConstants.PARAM_AUTH_PRINCIPAL, optional = true, description = IHdfsConstants.DESC_PRINCIPAL)
public void setAuthPrincipal(String authPrincipal) {
this.fAuthPrincipal = authPrincipal;
}
Expand All @@ -375,7 +414,7 @@ public String getAuthPrincipal() {
}

// Parameter authKeytab
@Parameter(name = "authKeytab", optional = true, description = IHdfsConstants.DESC_AUTH_KEY)
@Parameter(name = IHdfsConstants.PARAM_AUTH_KEYTAB, optional = true, description = IHdfsConstants.DESC_AUTH_KEY)
public void setAuthKeytab(String authKeytab) {
this.fAuthKeytab = authKeytab;
}
Expand All @@ -385,7 +424,7 @@ public String getAuthKeytab() {
}

// Parameter CredFile
@Parameter(name = "credFile", optional = true, description = IHdfsConstants.DESC_CRED_FILE)
@Parameter(name = IHdfsConstants.PARAM_CRED_FILE, optional = true, description = IHdfsConstants.DESC_CRED_FILE)
public void setCredFile(String credFile) {
this.fCredFile = credFile;
}
Expand All @@ -395,7 +434,7 @@ public String getCredFile() {
}

// Parameter ConfigPath
@Parameter(name = "configPath", optional = true, description = IHdfsConstants.DESC_CONFIG_PATH)
@Parameter(name = IHdfsConstants.PARAM_CONFIG_PATH, optional = true, description = IHdfsConstants.DESC_CONFIG_PATH)
public void setConfigPath(String configPath) {
this.fConfigPath = configPath;
}
Expand All @@ -405,7 +444,7 @@ public String getConfigPath() {
}

// Parameter keyStorePath
@Parameter(name = "keyStorePath", optional = true, description = IHdfsConstants.DESC_KEY_STOR_PATH)
@Parameter(name = IHdfsConstants.PARAM_KEY_STOR_PATH, optional = true, description = IHdfsConstants.DESC_KEY_STOR_PATH)
public void setKeyStorePath(String keyStorePath) {
fKeyStorePath = keyStorePath;
}
Expand All @@ -415,7 +454,7 @@ public String getKeyStorePath() {
}

// Parameter keyStorePassword
@Parameter(name = "keyStorePassword", optional = true, description = IHdfsConstants.DESC_KEY_STOR_PASSWORD)
@Parameter(name = IHdfsConstants.PARAM_KEY_STOR_PASSWORD, optional = true, description = IHdfsConstants.DESC_KEY_STOR_PASSWORD)
public void setKeyStorePassword(String keyStorePassword) {
fKeyStorePassword = keyStorePassword;
}
Expand All @@ -425,7 +464,7 @@ public String getKeyStorePassword() {
}

// Parameter libPath
@Parameter(name = "libPath", optional = true, description = IHdfsConstants.DESC_LIB_PATH)
@Parameter(name = IHdfsConstants.PARAM_LIB_PATH, optional = true, description = IHdfsConstants.DESC_LIB_PATH)
public void setLibPath(String libPath) {
fLibPath = libPath;
}
Expand All @@ -435,7 +474,7 @@ public String getLibPath() {
}

// Parameter policyFilePath
@Parameter(name = "policyFilePath" ,optional = true, description = IHdfsConstants.DESC_POLICY_FILE_PATH)
@Parameter(name = IHdfsConstants.PARAM_POLICY_FILE_PATH ,optional = true, description = IHdfsConstants.DESC_POLICY_FILE_PATH)
public void setPolicyFilePath(String policyFilePath) {
fPolicyFilePath = policyFilePath;
}
Expand All @@ -445,7 +484,7 @@ public String getPolicyFilePath() {
}

// Parameter credentials
@Parameter(name = "credentials", optional = true, description = IHdfsConstants.DESC_CREDENTIALS)
@Parameter(name = IHdfsConstants.PARAM_CREDENTIALS, optional = true, description = IHdfsConstants.DESC_CREDENTIALS)
public void setcredentials(String credentials) {
this.credentials = credentials;
}
Expand All @@ -456,7 +495,7 @@ public String getCredentials() {


// Parameter appConfigName
@Parameter(name = "appConfigName", optional = true, description = IHdfsConstants.DESC_APP_CONFIG_NAME)
@Parameter(name = IHdfsConstants.PARAM_APP_CONFIG_NAME, optional = true, description = IHdfsConstants.DESC_APP_CONFIG_NAME)
public void setAppConfigName(String appConfigName) {
this.appConfigName = appConfigName;
}
Expand Down Expand Up @@ -515,13 +554,12 @@ public void run() {
protected IHdfsClient createHdfsClient() throws Exception {
IHdfsClient client = new HdfsJavaClient();

client.setConnectionProperty(IHdfsConstants.KEYSTORE, getAbsolutePath(getKeyStorePath()));
client.setConnectionProperty(IHdfsConstants.KEYSTORE_PASSWORD, getKeyStorePassword());
client.setConnectionProperty(IHdfsConstants.PARAM_KEY_STOR_PATH, getAbsolutePath(getKeyStorePath()));
client.setConnectionProperty(IHdfsConstants.PARAM_KEY_STOR_PASSWORD, getKeyStorePassword());

client.setConnectionProperty(IHdfsConstants.HDFS_PASSWORD, getHdfsPassword());
client.setConnectionProperty(IHdfsConstants.AUTH_PRINCIPAL, getAuthPrincipal());
client.setConnectionProperty(IHdfsConstants.AUTH_KEYTAB, getAbsolutePath(getAuthKeytab()));
client.setConnectionProperty(IHdfsConstants.CRED_FILE, getAbsolutePath(getCredFile()));
client.setConnectionProperty(IHdfsConstants.PARAM_HDFS_PASSWORD, getHdfsPassword());
client.setConnectionProperty(IHdfsConstants.PARAM_AUTH_PRINCIPAL, getAuthPrincipal());
client.setConnectionProperty(IHdfsConstants.PARAM_AUTH_KEYTAB, getAbsolutePath(getAuthKeytab()));

return client;
}
Expand Down
Loading

0 comments on commit 6c4e44e

Please sign in to comment.