diff --git a/com.ibm.streamsx.hdfs/.classpath b/com.ibm.streamsx.hdfs/.classpath
index a7e4c95..1a5db16 100644
--- a/com.ibm.streamsx.hdfs/.classpath
+++ b/com.ibm.streamsx.hdfs/.classpath
@@ -3,36 +3,38 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/AbstractHdfsOperator.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/AbstractHdfsOperator.java
index 44651bb..31f886a 100755
--- a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/AbstractHdfsOperator.java
+++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/AbstractHdfsOperator.java
@@ -8,6 +8,8 @@
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -18,6 +20,9 @@
import org.apache.hadoop.fs.Path;
import com.ibm.json.java.JSONObject;
+import com.ibm.json.java.JSONArray;
+import com.ibm.json.java.*;
+
import com.ibm.streams.operator.AbstractOperator;
import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.OperatorContext.ContextCheck;
@@ -83,10 +88,231 @@ public synchronized void initialize(OperatorContext context) throws Exception {
setJavaSystemProperty();
loadAppConfig(context);
if (credentials != null) {
- this.getCredentials(credentials);
+ if (!this.getCredentials(credentials)){
+ return;
+ }
+ }
+
+ if (fCredFile != null) {
+ if (!this.getCredentialsFormFile(fCredFile)){
+ return;
+ }
}
+
setupClassPaths(context);
+ addConfigPathToClassPaths(context);
createConnection();
+
+ }
+
+ /**
+ * set policy file path and https.protocols in JAVA system properties
+ */
+ private void setJavaSystemProperty() {
+ String policyFilePath = getAbsolutePath(getPolicyFilePath());
+ if (policyFilePath != null) {
+ TRACE.log(TraceLevel.INFO, "Policy file path: " + policyFilePath);
+ System.setProperty("com.ibm.security.jurisdictionPolicyDir", policyFilePath);
+ }
+ System.setProperty("https.protocols", "TLSv1.2");
+ String httpsProtocol = System.getProperty("https.protocols");
+ TRACE.log(TraceLevel.INFO, "streamsx.hdfs https.protocols " + httpsProtocol);
+ }
+
+ /**
+ * read the application config into a map
+ *
+ * @param context the operator context
+ */
+ protected void loadAppConfig(OperatorContext context) {
+
+ // if no appconfig name is specified, create empty map
+ if (appConfigName == null) {
+ appConfig = new HashMap();
+ return;
+ }
+
+ appConfig = context.getPE().getApplicationConfiguration(appConfigName);
+ if (appConfig.isEmpty()) {
+ LOGGER.log(LogLevel.WARN, "Application config not found or empty: " + appConfigName);
+ }
+
+ for (Map.Entry kv : appConfig.entrySet()) {
+ TRACE.log(TraceLevel.DEBUG, "Found application config entry: " + kv.getKey() + "=" + kv.getValue());
+ }
+
+ if (null != appConfig.get("credentials")) {
+ credentials = appConfig.get("credentials");
+ }
+ }
+
+
+ /**
+ * read the credentials from file and set fHdfsUser, fHdfsPassword and fHdfsUrl.
+ * @param credFile
+ */
+ public boolean getCredentialsFormFile(String credFile) throws IOException {
+
+ String credentials = null;
+ try
+ {
+ credentials = new String ( Files.readAllBytes( Paths.get(getAbsolutePath(credFile)) ) );
+ }
+ catch (IOException e)
+ {
+ LOGGER.log(LogLevel.ERROR, "The credentials file " + getAbsolutePath(credFile) + "does not exist." );
+ return false;
+ }
+
+ if ((credentials != null ) && (!credentials.isEmpty())) {
+ return getCredentials(credentials);
+ }
+ return false;
+ }
+
+
+ /**
+ * read the credentials and set fHdfsUser, fHfsPassword and fHdfsUrl.
+ *
+ * @param credentials
+ */
+ public boolean getCredentials(String credentials) throws IOException {
+ String jsonString = credentials;
+ try {
+ JSONObject obj = JSONObject.parse(jsonString);
+ fHdfsUser = (String) obj.get("user");
+ if (fHdfsUser == null || fHdfsUser.trim().isEmpty()) {
+ fHdfsUser = (String) obj.get("hdfsUser");
+ if (fHdfsUser == null || fHdfsUser.trim().isEmpty()) {
+ LOGGER.log(LogLevel.ERROR, Messages.getString("'fHdfsUser' is required to create HDFS connection."));
+ throw new Exception(Messages.getString("'fHdfsUser' is required to create HDFS connection."));
+ }
+ }
+
+ fHdfsPassword = (String) obj.get("password");
+ if (fHdfsPassword == null || fHdfsPassword.trim().isEmpty()) {
+ fHdfsPassword = (String) obj.get("hdfsPassword");
+ if (fHdfsPassword == null || fHdfsPassword.trim().isEmpty()) {
+ LOGGER.log(LogLevel.ERROR, Messages.getString(
+ "'fHdfsPassword' is required to create HDFS connection."));
+ throw new Exception(Messages.getString("'fHdfsPassword' is required to create HDFS connection."));
+ }
+ }
+
+ fHdfsUri = (String) obj.get("webhdfs");
+ if (fHdfsUri == null || fHdfsUri.trim().isEmpty()) {
+ fHdfsUri = (String) obj.get("hdfsUri");
+ if (fHdfsUri == null || fHdfsUri.trim().isEmpty()) {
+ LOGGER.log(LogLevel.ERROR, Messages.getString("'fHdfsUri' is required to create HDFS connection."));
+ throw new Exception(Messages.getString("'fHdfsUri' is required to create HDFS connection."));
+ }
+ }
+
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * set the class path for Hadoop libraries
+ *
+ * @param context
+ */
+ private void setupClassPaths(OperatorContext context) {
+
+ ArrayList libList = new ArrayList<>();
+ String HADOOP_HOME = System.getenv("HADOOP_HOME");
+ if (getLibPath() != null) {
+ String user_defined_path = getLibPath() + "/*";
+ TRACE.log(TraceLevel.INFO, "Adding " + user_defined_path + " to classpath");
+ libList.add(user_defined_path);
+ } else {
+ // add class path for delivered jar files from ./impl/lib/ext/ directory
+ String default_dir = context.getToolkitDirectory() + "/impl/lib/ext/*";
+ TRACE.log(TraceLevel.INFO, "Adding /impl/lib/ext/* to classpath");
+ libList.add(default_dir);
+
+ if (HADOOP_HOME != null) {
+ // if no config path and no HdfsUri is defined it checks the
+ // HADOOP_HOME/config directory for default core-site.xml file
+ if ((fConfigPath == null) && (fHdfsUri == null)) {
+ libList.add(HADOOP_HOME + "/conf");
+ libList.add(HADOOP_HOME + "/../hadoop-conf");
+ libList.add(HADOOP_HOME + "/etc/hadoop");
+ libList.add(HADOOP_HOME + "/*");
+ libList.add(HADOOP_HOME + "/../hadoop-hdfs");
+ libList.add(HADOOP_HOME + "/lib/*");
+ libList.add(HADOOP_HOME + "/client/*");
+ }
+
+
+
+ }
+ }
+ for (int i = 0; i < libList.size(); i++) {
+ TRACE.log(TraceLevel.INFO, "calss path list " + i + " : " + libList.get(i));
+ }
+
+ try {
+ context.addClassLibraries(libList.toArray(new String[0]));
+
+ } catch (MalformedURLException e) {
+ LOGGER.log(TraceLevel.ERROR, "LIB_LOAD_ERROR", e);
+ }
+ }
+
+ private void addConfigPathToClassPaths(OperatorContext context) {
+
+ ArrayList libList = new ArrayList<>();
+ if (getConfigPath() != null) {
+ String user_defined_config_path = getAbsolutePath(getConfigPath())+ "/*";
+ TRACE.log(TraceLevel.INFO, "Adding " + user_defined_config_path + " to classpath");
+ libList.add(user_defined_config_path);
+ }
+
+ for (int i = 0; i < libList.size(); i++) {
+ TRACE.log(TraceLevel.INFO, "calss path list " + i + " : " + libList.get(i));
+ }
+
+ try {
+ context.addClassLibraries(libList.toArray(new String[0]));
+
+ } catch (MalformedURLException e) {
+ LOGGER.log(TraceLevel.ERROR, "LIB_LOAD_ERROR", e);
+ }
+ }
+
+
+ /** createConnection creates a connection to the hadoop file system. */
+ private synchronized void createConnection() throws Exception {
+ // Delay in miliseconds as specified in fReconnectionInterval parameter
+ final long delay = TimeUnit.MILLISECONDS.convert((long) fReconnectionInterval, TimeUnit.SECONDS);
+ LOGGER.log(TraceLevel.INFO, "createConnection ReconnectionPolicy " + fReconnectionPolicy + " ReconnectionBound "
+ + fReconnectionBound + " ReconnectionInterval " + fReconnectionInterval);
+ if (fReconnectionPolicy == IHdfsConstants.RECONNPOLICY_NORETRY) {
+ fReconnectionBound = 1;
+ }
+
+ if (fReconnectionPolicy == IHdfsConstants.RECONNPOLICY_INFINITERETRY) {
+ fReconnectionBound = 9999;
+ }
+
+ for (int nConnectionAttempts = 0; nConnectionAttempts < fReconnectionBound; nConnectionAttempts++) {
+ LOGGER.log(TraceLevel.INFO, "createConnection nConnectionAttempts is: " + nConnectionAttempts + " delay "
+ + delay);
+ try {
+ fHdfsClient = createHdfsClient();
+ fs = fHdfsClient.connect(getHdfsUri(), getHdfsUser(), getAbsolutePath(getConfigPath()));
+ LOGGER.log(TraceLevel.INFO, Messages.getString("HDFS_CLIENT_AUTH_CONNECT", fHdfsUri));
+ break;
+ } catch (Exception e) {
+ LOGGER.log(TraceLevel.ERROR, Messages.getString("HDFS_CLIENT_AUTH_CONNECT", e.toString()));
+ Thread.sleep(delay);
+ }
+ }
+
}
@@ -115,7 +341,7 @@ public static void checkParameters(OperatorContextChecker checker) {
}
- @Parameter(name = "hdfsUri", optional = true, description = IHdfsConstants.DESC_HDFS_URL)
+ @Parameter(name = IHdfsConstants.PARAM_HDFS_URI, optional = true, description = IHdfsConstants.DESC_HDFS_URL)
public void setHdfsUri(String hdfsUri) {
TRACE.log(TraceLevel.DEBUG, "setHdfsUri: " + hdfsUri);
fHdfsUri = hdfsUri;
@@ -126,7 +352,7 @@ public String getHdfsUri() {
}
- @Parameter(name = "hdfsUser", optional = true, description = IHdfsConstants.DESC_HDFS_USER)
+ @Parameter(name = IHdfsConstants.PARAM_HDFS_USER, optional = true, description = IHdfsConstants.DESC_HDFS_USER)
public void setHdfsUser(String hdfsUser) {
this.fHdfsUser = hdfsUser;
}
@@ -136,7 +362,7 @@ public String getHdfsUser() {
}
- @Parameter(name = "hdfsPassword", optional = true, description = IHdfsConstants.DESC_HDFS_PASSWORD)
+ @Parameter(name = IHdfsConstants.PARAM_HDFS_PASSWORD, optional = true, description = IHdfsConstants.DESC_HDFS_PASSWORD)
public void setHdfsPassword(String hdfsPassword) {
fHdfsPassword = hdfsPassword;
}
@@ -147,7 +373,7 @@ public String getHdfsPassword() {
// Parameter reconnectionPolicy
- @Parameter(name = "reconnectionPolicy", optional = true, description = IHdfsConstants.DESC_REC_POLICY)
+ @Parameter(name = IHdfsConstants.PARAM_REC_POLICY, optional = true, description = IHdfsConstants.DESC_REC_POLICY)
public void setReconnectionPolicy(String reconnectionPolicy) {
this.fReconnectionPolicy = reconnectionPolicy;
}
@@ -158,7 +384,7 @@ public String getReconnectionPolicy() {
// Parameter reconnectionBound
- @Parameter(name = "reconnectionBound", optional = true, description = IHdfsConstants.DESC_REC_BOUND)
+ @Parameter(name = IHdfsConstants.PARAM_REC_BOUND, optional = true, description = IHdfsConstants.DESC_REC_BOUND)
public void setReconnectionBound(int reconnectionBound) {
this.fReconnectionBound = reconnectionBound;
}
@@ -168,7 +394,7 @@ public int getReconnectionBound() {
}
// Parameter reconnectionInterval
- @Parameter(name = "reconnectionInterval", optional = true, description = IHdfsConstants.DESC_REC_INTERVAL)
+ @Parameter(name = IHdfsConstants.PARAM_REC_INTERVAL, optional = true, description = IHdfsConstants.DESC_REC_INTERVAL)
public void setReconnectionInterval(double reconnectionInterval) {
this.fReconnectionInterval = reconnectionInterval;
}
@@ -178,7 +404,7 @@ public double getReconnectionInterval() {
}
// Parameter authPrincipal
- @Parameter(name = "authPrincipal", optional = true, description = IHdfsConstants.DESC_PRINCIPAL)
+ @Parameter(name = IHdfsConstants.PARAM_AUTH_PRINCIPAL, optional = true, description = IHdfsConstants.DESC_PRINCIPAL)
public void setAuthPrincipal(String authPrincipal) {
this.fAuthPrincipal = authPrincipal;
}
@@ -188,7 +414,7 @@ public String getAuthPrincipal() {
}
// Parameter authKeytab
- @Parameter(name = "authKeytab", optional = true, description = IHdfsConstants.DESC_AUTH_KEY)
+ @Parameter(name = IHdfsConstants.PARAM_AUTH_KEYTAB, optional = true, description = IHdfsConstants.DESC_AUTH_KEY)
public void setAuthKeytab(String authKeytab) {
this.fAuthKeytab = authKeytab;
}
@@ -198,7 +424,7 @@ public String getAuthKeytab() {
}
// Parameter CredFile
- @Parameter(name = "credFile", optional = true, description = IHdfsConstants.DESC_CRED_FILE)
+ @Parameter(name = IHdfsConstants.PARAM_CRED_FILE, optional = true, description = IHdfsConstants.DESC_CRED_FILE)
public void setCredFile(String credFile) {
this.fCredFile = credFile;
}
@@ -208,7 +434,7 @@ public String getCredFile() {
}
// Parameter ConfigPath
- @Parameter(name = "configPath", optional = true, description = IHdfsConstants.DESC_CONFIG_PATH)
+ @Parameter(name = IHdfsConstants.PARAM_CONFIG_PATH, optional = true, description = IHdfsConstants.DESC_CONFIG_PATH)
public void setConfigPath(String configPath) {
this.fConfigPath = configPath;
}
@@ -218,7 +444,7 @@ public String getConfigPath() {
}
// Parameter keyStorePath
- @Parameter(name = "keyStorePath", optional = true, description = IHdfsConstants.DESC_KEY_STOR_PATH)
+ @Parameter(name = IHdfsConstants.PARAM_KEY_STOR_PATH, optional = true, description = IHdfsConstants.DESC_KEY_STOR_PATH)
public void setKeyStorePath(String keyStorePath) {
fKeyStorePath = keyStorePath;
}
@@ -228,7 +454,7 @@ public String getKeyStorePath() {
}
// Parameter keyStorePassword
- @Parameter(name = "keyStorePassword", optional = true, description = IHdfsConstants.DESC_KEY_STOR_PASSWORD)
+ @Parameter(name = IHdfsConstants.PARAM_KEY_STOR_PASSWORD, optional = true, description = IHdfsConstants.DESC_KEY_STOR_PASSWORD)
public void setKeyStorePassword(String keyStorePassword) {
fKeyStorePassword = keyStorePassword;
}
@@ -238,7 +464,7 @@ public String getKeyStorePassword() {
}
// Parameter libPath
- @Parameter(name = "libPath", optional = true, description = IHdfsConstants.DESC_LIB_PATH)
+ @Parameter(name = IHdfsConstants.PARAM_LIB_PATH, optional = true, description = IHdfsConstants.DESC_LIB_PATH)
public void setLibPath(String libPath) {
fLibPath = libPath;
}
@@ -248,7 +474,7 @@ public String getLibPath() {
}
// Parameter policyFilePath
- @Parameter(name = "policyFilePath" ,optional = true, description = IHdfsConstants.DESC_POLICY_FILE_PATH)
+ @Parameter(name = IHdfsConstants.PARAM_POLICY_FILE_PATH ,optional = true, description = IHdfsConstants.DESC_POLICY_FILE_PATH)
public void setPolicyFilePath(String policyFilePath) {
fPolicyFilePath = policyFilePath;
}
@@ -258,7 +484,7 @@ public String getPolicyFilePath() {
}
// Parameter credentials
- @Parameter(name = "credentials", optional = true, description = IHdfsConstants.DESC_CREDENTIALS)
+ @Parameter(name = IHdfsConstants.PARAM_CREDENTIALS, optional = true, description = IHdfsConstants.DESC_CREDENTIALS)
public void setcredentials(String credentials) {
this.credentials = credentials;
}
@@ -269,7 +495,7 @@ public String getCredentials() {
// Parameter appConfigName
- @Parameter(name = "appConfigName", optional = true, description = IHdfsConstants.DESC_APP_CONFIG_NAME)
+ @Parameter(name = IHdfsConstants.PARAM_APP_CONFIG_NAME, optional = true, description = IHdfsConstants.DESC_APP_CONFIG_NAME)
public void setAppConfigName(String appConfigName) {
this.appConfigName = appConfigName;
}
@@ -278,91 +504,6 @@ public String getAppConfigName() {
return this.appConfigName;
}
-
-
- /** createConnection creates a connection to the hadoop file system. */
- private synchronized void createConnection() throws Exception {
- // Delay in miliseconds as specified in fReconnectionInterval parameter
- final long delay = TimeUnit.MILLISECONDS.convert((long) fReconnectionInterval, TimeUnit.SECONDS);
- LOGGER.log(TraceLevel.INFO, "createConnection ReconnectionPolicy " + fReconnectionPolicy + " ReconnectionBound "
- + fReconnectionBound + " ReconnectionInterval " + fReconnectionInterval);
- if (fReconnectionPolicy == IHdfsConstants.RECONNPOLICY_NORETRY) {
- fReconnectionBound = 1;
- }
-
- if (fReconnectionPolicy == IHdfsConstants.RECONNPOLICY_INFINITERETRY) {
- fReconnectionBound = 9999;
- }
-
- for (int nConnectionAttempts = 0; nConnectionAttempts < fReconnectionBound; nConnectionAttempts++) {
- LOGGER.log(TraceLevel.INFO, "createConnection nConnectionAttempts is: " + nConnectionAttempts + " delay "
- + delay);
- try {
- fHdfsClient = createHdfsClient();
- fs = fHdfsClient.connect(getHdfsUri(), getHdfsUser(), getAbsolutePath(getConfigPath()));
- LOGGER.log(TraceLevel.INFO, Messages.getString("HDFS_CLIENT_AUTH_CONNECT", fHdfsUri));
- break;
- } catch (Exception e) {
- LOGGER.log(TraceLevel.ERROR, Messages.getString("HDFS_CLIENT_AUTH_CONNECT", e.toString()));
- Thread.sleep(delay);
- }
- }
-
- }
-
- /** set policy file path and https.protocols in JAVA system properties */
- private void setJavaSystemProperty() {
- String policyFilePath = getAbsolutePath(getPolicyFilePath());
- if (policyFilePath != null) {
- TRACE.log(TraceLevel.INFO, "Policy file path: " + policyFilePath);
- System.setProperty("com.ibm.security.jurisdictionPolicyDir", policyFilePath);
- }
- System.setProperty("https.protocols", "TLSv1.2");
- String httpsProtocol = System.getProperty("https.protocols");
- TRACE.log(TraceLevel.INFO, "streamsx.hdfs https.protocols " + httpsProtocol);
- }
-
- private void setupClassPaths(OperatorContext context) {
-
- ArrayList libList = new ArrayList<>();
- String HADOOP_HOME = System.getenv("HADOOP_HOME");
- if (getLibPath() != null) {
- String user_defined_path = getLibPath() + "/*";
- TRACE.log(TraceLevel.INFO, "Adding " + user_defined_path + " to classpath");
- libList.add(user_defined_path);
- } else {
- // add class path for delivered jar files from ./impl/lib/ext/ directory
- String default_dir = context.getToolkitDirectory() + "/impl/lib/ext/*";
- TRACE.log(TraceLevel.INFO, "Adding /impl/lib/ext/* to classpath");
- libList.add(default_dir);
-
- if (HADOOP_HOME != null) {
- // if no config path and no HdfsUri is defined it checks the
- // HADOOP_HOME/config directory for default core-site.xml file
- if ((fConfigPath == null) && (fHdfsUri == null)) {
- libList.add(HADOOP_HOME + "/conf");
- libList.add(HADOOP_HOME + "/../hadoop-conf");
- libList.add(HADOOP_HOME + "/etc/hadoop");
- libList.add(HADOOP_HOME + "/*");
- libList.add(HADOOP_HOME + "/../hadoop-hdfs");
- libList.add(HADOOP_HOME + "/lib/*");
- libList.add(HADOOP_HOME + "/client/*");
- }
-
- }
- }
- for (int i = 0; i < libList.size(); i++) {
- TRACE.log(TraceLevel.INFO, "calss path list " + i + " : " + libList.get(i));
- }
-
- try {
- context.addClassLibraries(libList.toArray(new String[0]));
-
- } catch (MalformedURLException e) {
- LOGGER.log(TraceLevel.ERROR, "LIB_LOAD_ERROR", e);
- }
- }
-
@Override
public void allPortsReady() throws Exception {
super.allPortsReady();
@@ -413,13 +554,12 @@ public void run() {
protected IHdfsClient createHdfsClient() throws Exception {
IHdfsClient client = new HdfsJavaClient();
- client.setConnectionProperty(IHdfsConstants.KEYSTORE, getAbsolutePath(getKeyStorePath()));
- client.setConnectionProperty(IHdfsConstants.KEYSTORE_PASSWORD, getKeyStorePassword());
+ client.setConnectionProperty(IHdfsConstants.PARAM_KEY_STOR_PATH, getAbsolutePath(getKeyStorePath()));
+ client.setConnectionProperty(IHdfsConstants.PARAM_KEY_STOR_PASSWORD, getKeyStorePassword());
- client.setConnectionProperty(IHdfsConstants.HDFS_PASSWORD, getHdfsPassword());
- client.setConnectionProperty(IHdfsConstants.AUTH_PRINCIPAL, getAuthPrincipal());
- client.setConnectionProperty(IHdfsConstants.AUTH_KEYTAB, getAbsolutePath(getAuthKeytab()));
- client.setConnectionProperty(IHdfsConstants.CRED_FILE, getAbsolutePath(getCredFile()));
+ client.setConnectionProperty(IHdfsConstants.PARAM_HDFS_PASSWORD, getHdfsPassword());
+ client.setConnectionProperty(IHdfsConstants.PARAM_AUTH_PRINCIPAL, getAuthPrincipal());
+ client.setConnectionProperty(IHdfsConstants.PARAM_AUTH_KEYTAB, getAbsolutePath(getAuthKeytab()));
return client;
}
@@ -441,65 +581,4 @@ protected IHdfsClient getHdfsClient() {
return fHdfsClient;
}
- /**
- * read the credentials and set user name fHdfsUser, fHfsPassword and
- * hdfsUrl.
- *
- * @param credentials
- */
- public void getCredentials(String credentials) throws IOException {
- String jsonString = credentials;
- try {
- JSONObject obj = JSONObject.parse(jsonString);
- fHdfsUser = (String) obj.get("user");
- if (fHdfsUser == null || fHdfsUser.trim().isEmpty()) {
- LOGGER.log(LogLevel.ERROR, Messages.getString("'fHdfsUser' is required to create HDFS connection."));
- throw new Exception(Messages.getString("'fHdfsUser' is required to create HDFS connection."));
- }
-
- fHdfsPassword = (String) obj.get("password");
- if (fHdfsPassword == null || fHdfsPassword.trim().isEmpty()) {
- LOGGER.log(LogLevel.ERROR, Messages.getString(
- "'fHdfsPassword' is required to create HDFS connection."));
- throw new Exception(Messages.getString("'fHdfsPassword' is required to create HDFS connection."));
- }
-
- fHdfsUri = (String) obj.get("webhdfs");
- if (fHdfsUri == null || fHdfsUri.trim().isEmpty()) {
- LOGGER.log(LogLevel.ERROR, Messages.getString("'fHdfsUri' is required to create HDFS connection."));
- throw new Exception(Messages.getString("'fHdfsUri' is required to create HDFS connection."));
- }
-
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
-
- /**
- * read the application config into a map
- *
- * @param context the operator context
- */
- protected void loadAppConfig(OperatorContext context) {
-
- // if no appconfig name is specified, create empty map
- if (appConfigName == null) {
- appConfig = new HashMap();
- return;
- }
-
- appConfig = context.getPE().getApplicationConfiguration(appConfigName);
- if (appConfig.isEmpty()) {
- LOGGER.log(LogLevel.WARN, "Application config not found or empty: " + appConfigName);
- }
-
- for (Map.Entry kv : appConfig.entrySet()) {
- TRACE.log(TraceLevel.DEBUG, "Found application config entry: " + kv.getKey() + "=" + kv.getValue());
- }
-
- if (null != appConfig.get("credentials")) {
- credentials = appConfig.get("credentials");
- }
- }
-
}
diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileCopy.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileCopy.java
index 9b6a97a..ef68cde 100755
--- a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileCopy.java
+++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileCopy.java
@@ -361,6 +361,7 @@ synchronized public void process(StreamingInput stream, Tuple tuple) thro
String hdfsFileNmae = "";
// get the start time
long millisStart = Calendar.getInstance().getTimeInMillis();
+ long size = 0;
try {
if (localFileAttrName != null) {
localFile = tuple.getString(localFileAttrName);
@@ -391,7 +392,8 @@ synchronized public void process(StreamingInput stream, Tuple tuple) thro
} else {
hdfsFullPath = fs.getUri().toString() + hdfsFileNmae;
}
- message = "Successfully copied from " + localFile + " to " + hdfsFullPath + " .";
+ size = new File(localFile).length();
+ message = size + " bytes successfully copied from " + localFile + " to " + hdfsFullPath + " .";
nCopiedFiles.incrementValue(1);
System.out.println(localPath + " --> " + hdfsPath + " " + nCopiedFiles.getValue()
+ " files copied.");
@@ -413,7 +415,8 @@ synchronized public void process(StreamingInput stream, Tuple tuple) thro
} else {
hdfsFullPath = fs.getUri().toString() + hdfsFile;
}
- message = "Successfully copied from " + hdfsFullPath + " to " + localFileName + " .";
+ size = new File(localFileName).length();
+ message = size + " bytes successfully copied from " + hdfsFullPath + " to " + localFileName + " .";
nCopiedFiles.incrementValue(1);
System.out.println(hdfsPath + " --> " + localPath + " " + nCopiedFiles.getValue()
+ " files copied.");
diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java
index 40dc2c5..e82df9e 100755
--- a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java
+++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java
@@ -810,7 +810,7 @@ private void closeFile() throws Exception {
}
nSikedFiles.incrementValue(1);
- System.out.println(fFileToWrite + " " + nSikedFiles.getValue() + " files written.");
+ System.out.println(nSikedFiles.getValue() + " : file " + fs.getHomeDirectory() + "/" + fFileToWrite.getPath() + " created.");
// operators can perform additional
if (hasOutputPort && success) {
diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/IHdfsConstants.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/IHdfsConstants.java
index ff7982e..2c5dead 100755
--- a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/IHdfsConstants.java
+++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/IHdfsConstants.java
@@ -7,14 +7,32 @@
public class IHdfsConstants {
-
+ // general parameters
+ public static final String PARAM_HDFS_URI = "hdfsUri";
+ public static final String PARAM_HDFS_USER = "hdfsUser";
+ public static final String PARAM_HDFS_PASSWORD = "hdfsPassword";
+ public static final String PARAM_REC_POLICY = "reconnectionPolicy";
+ public static final String PARAM_REC_BOUND = "reconnectionBound";
+ public static final String PARAM_REC_INTERVAL = "reconnectionInterval";
+ public static final String PARAM_AUTH_PRINCIPAL = "authPrincipal";
+ public static final String PARAM_AUTH_KEYTAB = "authKeytab";
+ public static final String PARAM_CRED_FILE = "credFile";
+ public static final String PARAM_CONFIG_PATH = "configPath";
+ public static final String PARAM_KEY_STOR_PATH = "keyStorePath";
+ public static final String PARAM_KEY_STOR_PASSWORD = "keyStorePassword";
+ public static final String PARAM_LIB_PATH = "libPath";
+ public static final String PARAM_POLICY_FILE_PATH = "policyFilePath";
+ public static final String PARAM_CREDENTIALS = "credentials";
+ public static final String PARAM_APP_CONFIG_NAME = "appConfigName";
+
+
// HDFSFileSink parameters
public static final String PARAM_TIME_PER_FILE = "timePerFile";
public static final String PARAM_CLOSE_ON_PUNCT = "closeOnPunct";
public static final String PARAM_TUPLES_PER_FILE = "tuplesPerFile";
public static final String PARAM_BYTES_PER_FILE = "bytesPerFile";
public static final String PARAM_FILE_NAME_ATTR = "fileAttributeName";
- public static final String PARAM_TEMP_FILE = "tempfile";
+ public static final String PARAM_TEMP_FILE = "tempFile";
public static final String PARAM_TIME_FORMAT = "timeFormat";
// HDFSFileCopy parameters
@@ -48,18 +66,11 @@ public class IHdfsConstants {
public static final String FILE_VAR_PROCID = "%PROCID";
public static final String FILE_VAR_HOST = "%HOST";
- public final static String HDFS_PASSWORD = "password";
- public final static String AUTH_PRINCIPAL = "authPrincipal";
- public final static String AUTH_KEYTAB = "authKeytab";
- public final static String CRED_FILE = "credFile";
-
public final static String FS_HDFS = "hdfs";
public final static String FS_GPFS = "gpfs";
public final static String FS_WEBHDFS = "webhdfs";
public static final String KNOX_PASSWORD = "knox.password";
public static final String KNOX_USER = "knox.user";
- public static final String KEYSTORE = "keystore_path";
- public static final String KEYSTORE_PASSWORD = "keystore_password";
public static final String RECONNPOLICY_NORETRY = "NoRetry";
public static final String RECONNPOLICY_BOUNDEDRETRY = "BoundedRetry";
@@ -109,22 +120,16 @@ public class IHdfsConstants {
+ "This value is set to the principal that is created for the IBM Streams instance owner. \\n"
+ "You must specify this parameter if you want to use Kerberos authentication.";
- public static final String DESC_AUTH_KEY = "This parameter specifies the file that contains the encrypted keys for the user that is specified by the `authPrincipal` parameter. \\n"
+ public static final String DESC_AUTH_KEY = "This optional parameter specifies the file that contains the encrypted keys for the user that is specified by the `authPrincipal` parameter. \\n"
+ "The operator uses this keytab file to authenticate the user. \\n"
+ "The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.";
-
- public static final String DESC_CRED_FILE = "This parameter specifies a file that contains login credentials. The credentials are used to "
- + "connect to GPFS remotely by using the `webhdfs://hdfshost:webhdfsport` schema. The credentials file must contain information "
- + "about how to authenticate with IBM Analytics Engine when using the `webhdfs` schema. \\n"
- + "For example, the file must contain the username and password for an IBM Analytics Engine user. \\n"
- + "When connecting to HDFS instances deployed on IBM Analytics Engine, \\n"
- + "the credentials are provided using the `hdfsUser` and `hdfsPassword` parameters.";
-
- public static final String DESC_CONFIG_PATH = "This parameter specifies the path to the directory that contains the `core-site.xml` file, which is an HDFS\\n"
- + "configuration file. If this parameter is not specified, by default the operator looks for the `core-site.xml` file in the following locations:\\n"
+
+ public static final String DESC_CONFIG_PATH = "This optional parameter specifies the path to the directory that contains the HDFS configuration file `core-site.xml` .\\n\\n"
+ + "If you have extra HDFS configuration in your `hdsf-site.xml` file, you have to copy also this configuration file from your Hadoop server into `configPath` directory.\\n\\n"
+ + "If this parameter is not specified, by default the operator looks for the `core-site.xml` and `hdsf-site.xml` files in the following locations:\\n"
+ "* `$HADOOP_HOME/etc/hadoop`\\n" + "* `$HADOOP_HOME/conf`\\n"
+ "* `$HADOOP_HOME/lib` \\n" + "* `$HADOOP_HOME/`\\n"
- + "**Note:** For connections to Hadoop instances deployed on IBM Analytics Engine, the `$HADOOP_HOME` environment variable is not supported and should not be used.";
+ + "**Note:** For connections to Hadoop instances via webhdfs, the `$HADOOP_HOME` environment variable is not supported and can not be used.";
public static final String DESC_POLICY_FILE_PATH = "This optional parameter is relevant when connecting to IBM Analytics Engine on IBM Cloud. \\n"
+ "It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar). \\n"
@@ -163,8 +168,14 @@ public class IHdfsConstants {
+ " \\\"user\\\" : \\\"clsadmin\\\",\\n"
+ " \\\"password\\\" : \\\"IAE-password\\\",\\n"
+ " \\\"webhdfs\\\" : \\\"webhdfs://ip-address:8443\\\"\\n"
- + " }\\n";
-
+ + " }\\n"
+ + "It is possible to use `hdfsUser` or `user` for hdfs user \\n"
+ + "and `hdfsPassword` or `password` for hdfs passwor \\n"
+ + "and `webhdfs` or `hdfsUri` for hdfs URL in jSON string.\\n";
+
+ public static final String DESC_CRED_FILE = "This optional parameter specifies a file that contains login credentials. The credentials are used to "
+ + "connect to WEBHDF remotely by using the schema:\\n `webhdfs://hdfshost:webhdfsport` \\n"
+ + "The credentials file must be a valid JSON string and must contain the hdfs credentials key/value pairs for `user`, `password` and `webhdfs` in JSON format.\\n";
public static final String DESC_APP_CONFIG_NAME ="This optional parameter specifies the name of the application configuration that contains HDFS connection related configuration parameter `credentials`. "
+ " The `credentials` is a JSON string that contains key/value pairs for `user` and `password` and `webhdfs` . "
diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/client/auth/BaseAuthenticationHelper.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/client/auth/BaseAuthenticationHelper.java
index ffa126d..10f1b13 100755
--- a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/client/auth/BaseAuthenticationHelper.java
+++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/client/auth/BaseAuthenticationHelper.java
@@ -10,6 +10,7 @@
import java.net.URL;
import java.util.Map;
import java.util.logging.Logger;
+import java.io.File;
import java.io.FileNotFoundException;
import org.apache.hadoop.conf.Configuration;
@@ -50,8 +51,28 @@ public BaseAuthenticationHelper(String configPath) {
if (configPath != null && !configPath.isEmpty()) {
try {
- URL url = new URL("file", null, configPath + "/core-site.xml");
- fConfiguration.addResource(url);
+ String coreSiteXmlFileName = configPath + "/core-site.xml";
+ String hdfsSiteXmlFileName = configPath + "/hdfs-site.xml";
+
+ File coreSiteXmlFile = new File(coreSiteXmlFileName);
+ if (coreSiteXmlFile.exists()){
+ URL coreSiteXmlUrl = new URL("file", null, coreSiteXmlFileName);
+ fConfiguration.addResource(coreSiteXmlUrl);
+ LOGGER.log(TraceLevel.INFO, "core-site.xml file: " + coreSiteXmlFileName);
+ System.out.println("core-site.xml file: " + coreSiteXmlFileName);
+ }
+ else{
+ LOGGER.log(TraceLevel.ERROR, "core-site.xml file doesn't exist in " + configPath);
+ }
+
+ File hdfsSiteXmlFile = new File(hdfsSiteXmlFileName);
+ if (hdfsSiteXmlFile.exists()){
+ URL hdsfSiteXmlUrl = new URL("file", null, hdfsSiteXmlFileName);
+ fConfiguration.addResource(hdsfSiteXmlUrl);
+ LOGGER.log(TraceLevel.INFO, "hdfs-site.xml file: " + hdfsSiteXmlFile);
+ System.out.println("hdfs-site.xml file: " + hdfsSiteXmlFile);
+ }
+
} catch (MalformedURLException e) {
LOGGER.log(TraceLevel.ERROR, e.getMessage(), e);
}
@@ -81,19 +102,19 @@ public FileSystem connect(String fileSystemUri, String hdfsUser, Map connectionProperties) {
- return hdfsUser != null && connectionProperties.get(IHdfsConstants.HDFS_PASSWORD) != null;
+ return hdfsUser != null && connectionProperties.get(IHdfsConstants.PARAM_HDFS_PASSWORD) != null;
}
protected FileSystem internalGetFileSystem(URI hdfsUri, String hdfsUser) throws Exception {
diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/client/auth/HDFSAuthenticationHelper.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/client/auth/HDFSAuthenticationHelper.java
index a1d6a20..488b85e 100755
--- a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/client/auth/HDFSAuthenticationHelper.java
+++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/client/auth/HDFSAuthenticationHelper.java
@@ -38,8 +38,8 @@ public FileSystem connect(String fileSystemUri, final String hdfsUser, Map
- 5.0.0
+ 5.1.0
4.2.0.0
diff --git a/com.ibm.streamsx.hdfs/pom.xml b/com.ibm.streamsx.hdfs/pom.xml
index db0e945..bc6e939 100755
--- a/com.ibm.streamsx.hdfs/pom.xml
+++ b/com.ibm.streamsx.hdfs/pom.xml
@@ -8,7 +8,7 @@
com.ibm.streamsx.hdfs
streamsx.hdfs
jar
- 5.0.0
+ 5.1.0
com.ibm.streamsx.hdfs
@@ -237,6 +237,18 @@
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ 2.9.9
+
+
+ *
+ *
+
+
+
+
org.codehaus.jackson
jackson-core-asl
diff --git a/samples/webHDFS/.gitignore b/samples/webHDFS/.gitignore
new file mode 100644
index 0000000..c65565e
--- /dev/null
+++ b/samples/webHDFS/.gitignore
@@ -0,0 +1,6 @@
+/output
+/toolkit.xml
+/impl
+/.toolkitList
+/bin/
+/doc/
diff --git a/samples/webHDFS/Makefile b/samples/webHDFS/Makefile
new file mode 100644
index 0000000..f582a7e
--- /dev/null
+++ b/samples/webHDFS/Makefile
@@ -0,0 +1,28 @@
+#####################################################################
+# Copyright (C)2014, 2016 International Business Machines Corporation and
+# others. All Rights Reserved.
+#####################################################################
+
+.PHONY: all clean
+
+#SPLC_FLAGS = -t $(STREAMS_INSTALL)/toolkits/com.ibm.streamsx.hdfs --data-directory data
+SPLC_FLAGS = -t ../../com.ibm.streamsx.hdfs --data-directory data
+
+SPLC = $(STREAMS_INSTALL)/bin/sc
+
+SPL_CMD_ARGS ?=
+SPL_COMP1NAME=webHDFS
+SPL_MAIN_COMPOSITE1 = application::$(SPL_COMP1NAME)
+BUILD_OUTPUT_DIR = output
+
+all: data clean
+ $(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE1) --output-dir ./$(BUILD_OUTPUT_DIR) $(SPL_CMD_ARGS)
+
+data:
+ mkdir data
+clean:
+ $(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE1) --output-dir output
+ -rm -rf toolkit.xml
+ -rm -rf data/*.*
+
+
diff --git a/samples/webHDFS/application/.namespace b/samples/webHDFS/application/.namespace
new file mode 100644
index 0000000..e69de29
diff --git a/samples/webHDFS/application/webHDFS.spl b/samples/webHDFS/application/webHDFS.spl
new file mode 100644
index 0000000..da141ed
--- /dev/null
+++ b/samples/webHDFS/application/webHDFS.spl
@@ -0,0 +1,124 @@
+
+/*******************************************************************************
+* Copyright (C) 2019, International Business Machines Corporation
+* All Rights Reserved
+*******************************************************************************/
+namespace application ;
+
+use com.ibm.streamsx.hdfs::* ;
+
+/**
+ * The webHDFS sample demonstrates how to access webhdfs via knox useer and password.
+ * A Beacon operator generates some test line
+ * HdfsFileSink writes every 10 lines in a new file in /user/hdfs/out directory
+ * HdfsDirScanOut scans the given directory (out) from HDFS, which is the user's home directory and returns the file names
+ * HdfsFileSource reads files and returns lines. It uses the file name from directory scan to read the file
+ * CopyFromHdfsToLocal copies all incoming files (/user/hdfs/out/output-xx.txt) from input port into local directory data.
+ * Prints operators are Custom and prints the output of HDFS operators
+ *
+ * Please replace the credentials in SPL with your credentials.
+ */
+composite webHDFS
+{
+ param
+ expression $credentials : getSubmissionTimeValue("credentials",
+ "{ 'user': 'hdfs', 'webhdfs': 'webhdfs://:8443', 'password': 'hdf-spassword'}" );
+ graph
+
+
+ // generates lines
+ stream CreateLines = Beacon()
+ {
+ param
+ initDelay : 1.0 ;
+ iterations : 100u ;
+ output
+ CreateLines : line = (rstring)IterationCount() + ": This line will be written into a HDFS file." ;
+ }
+
+ // HdfsFileSink writes every 10 lines from CreateLines in a new file in /user/hdfs/out directory
+ stream HdfsFileSink = HDFS2FileSink(CreateLines)
+ {
+ param
+ credentials : $credentials ;
+ file : "out/output-%FILENUM.txt" ;
+ tuplesPerFile : 10l ;
+ }
+
+ //print out the file names and the size of file
+ () as PrintHdfsFileSink = Custom(HdfsFileSink)
+ {
+ logic
+ onTuple HdfsFileSink :
+ {
+ printStringLn("HdfsFileSink fileName , size : " +(rstring) HdfsFileSink) ;
+ }
+
+ }
+
+ // HdfsDirScanOut scans the given directory from HDFS, default to . which is the user's home directory
+ stream HdfsDirScanOut = HDFS2DirectoryScan()
+ {
+ param
+ initDelay : 10.0 ;
+ directory : "out" ;
+ credentials : $credentials ;
+ strictMode : false ;
+ }
+
+
+ //print out the names of each file found in the directory
+ () as PrintHdfsDirScanOut = Custom(HdfsDirScanOut)
+ {
+ logic
+ onTuple HdfsDirScanOut :
+ {
+ printStringLn("HdfsDirScanOut fileName : " +(rstring) HdfsDirScanOut) ;
+ }
+
+ }
+
+ // HdfsFileSource reads files and returns lines into output port
+ // It uses the file name from directory scan to read the file
+ stream HdfsFileSource = HDFS2FileSource(HdfsDirScanOut)
+ {
+ param
+ credentials : $credentials ;
+ }
+
+ //print out the names of each file found in the directory
+ () as PrintHdfsFileSource = Custom(HdfsFileSource)
+ {
+ logic
+ onTuple HdfsFileSource :
+ {
+ printStringLn("HdfsFileSource line : " + lines) ;
+ }
+
+ }
+
+ // copies all incoming files from input port /user/hdfs/out/outputx.txt into local data directory.
+ stream CopyFromHdfsToLocal = HDFS2FileCopy(HdfsDirScanOut)
+ {
+ param
+ hdfsFileAttrName : "hdfsFile" ;
+ localFile : "./" ;
+ deleteSourceFile : false ;
+ overwriteDestinationFile : true ;
+ direction : copyToLocalFile ;
+ credentials : $credentials ;
+
+ }
+
+ //print out the message and the elapsed time
+ () as PrintCopyFromHdfsToLocal = Custom(CopyFromHdfsToLocal)
+ {
+ logic
+ onTuple CopyFromHdfsToLocal :
+ {
+ printStringLn("CopyFromHdfsToLocal message, elapsedTime : " +(rstring) CopyFromHdfsToLocal) ;
+ }
+
+ }
+
+}