diff --git a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchContext.java b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchContext.java
index e5a9251fbb46..d2553ca98354 100644
--- a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchContext.java
+++ b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchContext.java
@@ -23,6 +23,7 @@
import io.cdap.cdap.api.dataset.InstanceConflictException;
import io.cdap.cdap.etl.api.TransformContext;
import io.cdap.cdap.etl.api.action.SettableArguments;
+import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
/**
* Context passed to Batch Source and Sink.
@@ -61,4 +62,13 @@ void createDataset(String datasetName, String typeName, DatasetProperties proper
*/
@Override
SettableArguments getArguments();
+
+ /**
+ * Overrides the error details provider specified in the stage.
+ *
+ * @param errorDetailsProviderSpec the error details provider spec.
+ */
+ default void setErrorDetailsProvider(ErrorDetailsProviderSpec errorDetailsProviderSpec) {
+ // no-op
+ }
}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorContext.java b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorContext.java
new file mode 100644
index 000000000000..1f32135569d2
--- /dev/null
+++ b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorContext.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.etl.api.exception;
+
+/**
+ * Context for error details provider.
+ *
+ *
+ * This class provides the context for the error details provider.
+ *
+ */
+public class ErrorContext {
+ private final ErrorPhase phase;
+
+ public ErrorContext(ErrorPhase phase) {
+ this.phase = phase;
+ }
+
+ public ErrorPhase getPhase() {
+ return phase;
+ }
+}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorDetailsProvider.java b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorDetailsProvider.java
new file mode 100644
index 000000000000..8429e1aced3b
--- /dev/null
+++ b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorDetailsProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.etl.api.exception;
+
+import io.cdap.cdap.api.exception.ProgramFailureException;
+import javax.annotation.Nullable;
+
+/**
+ * Interface for providing error details.
+ *
+ *
+ * Implementations of this interface can be used to provide more detailed error information
+ * for exceptions that occur within the code using {@link ProgramFailureException}.
+ *
+ */
+public interface ErrorDetailsProvider {
+
+ /**
+ * Returns a {@link ProgramFailureException} that wraps the given exception
+ * with more detailed information.
+ *
+ * @param e the exception to wrap.
+ * @param context the context of the error.
+ * @return {@link ProgramFailureException} that wraps the given exception
+ * with more detailed information.
+ */
+ @Nullable
+ ProgramFailureException getExceptionDetails(Exception e, ErrorContext context);
+}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorDetailsProviderSpec.java b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorDetailsProviderSpec.java
new file mode 100644
index 000000000000..99c22539bb53
--- /dev/null
+++ b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorDetailsProviderSpec.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.etl.api.exception;
+
+public class ErrorDetailsProviderSpec {
+ private final String className;
+
+ public ErrorDetailsProviderSpec(String className) {
+ this.className = className;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorPhase.java b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorPhase.java
new file mode 100644
index 000000000000..c9bce112ae8c
--- /dev/null
+++ b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorPhase.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.etl.api.exception;
+
+/**
+ * Enum representing the different phases of a stage where error can occur.
+ */
+public enum ErrorPhase {
+ SPLITTING("Splitting"),
+ READING("Reading"),
+ VALIDATING_OUTPUT_SPECS("Validating Output Specs"),
+ WRITING("Writing"),
+ COMMITTING("Committing");
+
+ private final String displayName;
+
+ ErrorPhase(String displayName) {
+ this.displayName = displayName;
+ }
+
+ /**
+ * Returns a string representation of the error phase enum.
+ */
+ @Override
+ public String toString() {
+ return displayName;
+ }
+}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/NoopErrorDetailsProvider.java b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/NoopErrorDetailsProvider.java
new file mode 100644
index 000000000000..fbf76076bb13
--- /dev/null
+++ b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/NoopErrorDetailsProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.etl.api.exception;
+
+import io.cdap.cdap.api.exception.ProgramFailureException;
+import javax.annotation.Nullable;
+
+/**
+ * Noop implementation of {@link ErrorDetailsProvider}.
+ */
+public class NoopErrorDetailsProvider implements ErrorDetailsProvider {
+
+ @Nullable
+ @Override
+ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext context) {
+ return null;
+ }
+}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/ErrorDetails.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/ErrorDetails.java
new file mode 100644
index 000000000000..7bb740cca17f
--- /dev/null
+++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/ErrorDetails.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.etl.common;
+
+import io.cdap.cdap.api.exception.ProgramFailureException;
+import io.cdap.cdap.api.exception.WrappedStageException;
+import io.cdap.cdap.etl.api.exception.ErrorContext;
+import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
+import io.cdap.cdap.etl.api.exception.ErrorPhase;
+import io.cdap.cdap.etl.api.exception.NoopErrorDetailsProvider;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for handling exceptions.
+ */
+public class ErrorDetails {
+ private static final Logger LOG = LoggerFactory.getLogger(ErrorDetails.class);
+ public static final String ERROR_DETAILS_PROVIDER_CLASS_NAME_KEY =
+ "io.cdap.pipeline.error.details.provider.classname";
+
+ /**
+ * Gets the {@link ErrorDetailsProvider} from the given {@link Configuration}.
+ *
+ * @param conf the configuration to get the error details provider from.
+ * @return the error details provider.
+ */
+ public static ErrorDetailsProvider getErrorDetailsProvider(Configuration conf) {
+ String errorDetailsProviderClassName =
+ conf.get(ERROR_DETAILS_PROVIDER_CLASS_NAME_KEY);
+ if (errorDetailsProviderClassName == null) {
+ return new NoopErrorDetailsProvider();
+ }
+ try {
+ return (ErrorDetailsProvider) conf.getClassLoader()
+ .loadClass(errorDetailsProviderClassName)
+ .newInstance();
+ } catch (Exception e) {
+ LOG.warn(String.format("Unable to instantiate errorDetailsProvider class '%s'.",
+ errorDetailsProviderClassName), e);
+ return new NoopErrorDetailsProvider();
+ }
+ }
+
+ /**
+ * Handles the given exception, wrapping it in a {@link WrappedStageException}.
+ *
+ * @param e the exception to handle.
+ * @param stageName the name of the stage where the exception occurred.
+ * @param errorDetailsProvider the error details provider.
+ * @param phase the phase of the stage where the exception occurred.
+ * @return the wrapped stage exception.
+ */
+ public static WrappedStageException handleException(Exception e, String stageName,
+ ErrorDetailsProvider errorDetailsProvider, ErrorPhase phase) {
+ ProgramFailureException exception = null;
+
+ if (!(e instanceof ProgramFailureException)) {
+ exception = errorDetailsProvider == null ? null :
+ errorDetailsProvider.getExceptionDetails(e, new ErrorContext(phase));
+ }
+ return new WrappedStageException(exception == null ? e : exception, stageName);
+ }
+}
diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java
index 72de9e08ebd9..514c96e58d3d 100644
--- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java
+++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java
@@ -22,9 +22,11 @@
import io.cdap.cdap.api.spark.JavaSparkExecutionContext;
import io.cdap.cdap.api.spark.SparkClientContext;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
+import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.batch.AbstractBatchContext;
import io.cdap.cdap.etl.batch.BasicOutputFormatProvider;
import io.cdap.cdap.etl.batch.preview.NullOutputFormatProvider;
+import io.cdap.cdap.etl.common.ErrorDetails;
import io.cdap.cdap.etl.common.PipelineRuntime;
import io.cdap.cdap.etl.proto.v2.spec.StageSpec;
import io.cdap.cdap.etl.spark.io.StageTrackingOutputFormat;
@@ -39,21 +41,27 @@
public class SparkBatchSinkContext extends AbstractBatchContext implements BatchSinkContext {
private final SparkBatchSinkFactory sinkFactory;
private final boolean isPreviewEnabled;
+ private ErrorDetailsProviderSpec errorDetailsProviderSpec;
public SparkBatchSinkContext(SparkBatchSinkFactory sinkFactory, SparkClientContext sparkContext,
- PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) {
+ PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) {
super(pipelineRuntime, stageSpec, datasetContext, sparkContext.getAdmin());
this.sinkFactory = sinkFactory;
this.isPreviewEnabled = stageSpec.isPreviewEnabled(sparkContext);
}
public SparkBatchSinkContext(SparkBatchSinkFactory sinkFactory, JavaSparkExecutionContext sec,
- DatasetContext datasetContext, PipelineRuntime pipelineRuntime, StageSpec stageSpec) {
+ DatasetContext datasetContext, PipelineRuntime pipelineRuntime, StageSpec stageSpec) {
super(pipelineRuntime, stageSpec, datasetContext, sec.getAdmin());
this.sinkFactory = sinkFactory;
this.isPreviewEnabled = stageSpec.isPreviewEnabled(sec);
}
+ @Override
+ public void setErrorDetailsProvider(ErrorDetailsProviderSpec errorDetailsProviderSpec) {
+ this.errorDetailsProviderSpec = errorDetailsProviderSpec;
+ }
+
@Override
public void addOutput(Output output) {
Output actualOutput = suffixOutput(getOutput(output));
@@ -64,6 +72,10 @@ public void addOutput(Output output) {
Map conf = new HashMap<>(provider.getOutputFormatConfiguration());
conf.put(StageTrackingOutputFormat.DELEGATE_CLASS_NAME, provider.getOutputFormatClassName());
conf.put(StageTrackingOutputFormat.WRAPPED_STAGE_NAME, getStageName());
+ if (errorDetailsProviderSpec != null) {
+ conf.put(ErrorDetails.ERROR_DETAILS_PROVIDER_CLASS_NAME_KEY,
+ errorDetailsProviderSpec.getClassName());
+ }
provider = new BasicOutputFormatProvider(StageTrackingOutputFormat.class.getName(), conf);
actualOutput = Output.of(actualOutput.getName(), provider).alias(actualOutput.getAlias());
}
diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java
index 5e05b4220b00..3a75313b3a89 100644
--- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java
+++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java
@@ -21,8 +21,10 @@
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.spark.SparkClientContext;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.batch.BasicInputFormatProvider;
import io.cdap.cdap.etl.batch.preview.LimitingInputFormatProvider;
+import io.cdap.cdap.etl.common.ErrorDetails;
import io.cdap.cdap.etl.common.ExternalDatasets;
import io.cdap.cdap.etl.common.PipelineRuntime;
import io.cdap.cdap.etl.proto.v2.spec.StageSpec;
@@ -40,16 +42,24 @@ public class SparkBatchSourceContext extends SparkSubmitterContext implements Ba
private final SparkBatchSourceFactory sourceFactory;
private final boolean isPreviewEnabled;
+ private ErrorDetailsProviderSpec errorDetailsProviderSpec;
- public SparkBatchSourceContext(SparkBatchSourceFactory sourceFactory, SparkClientContext sparkContext,
- PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) {
+ public SparkBatchSourceContext(SparkBatchSourceFactory sourceFactory,
+ SparkClientContext sparkContext, PipelineRuntime pipelineRuntime,
+ DatasetContext datasetContext, StageSpec stageSpec) {
super(sparkContext, pipelineRuntime, datasetContext, StageSpec.
- createCopy(stageSpec, sparkContext.getDataTracer(stageSpec.getName()).getMaximumTracedRecords(),
- sparkContext.getDataTracer(stageSpec.getName()).isEnabled()));
+ createCopy(stageSpec, sparkContext.getDataTracer(
+ stageSpec.getName()).getMaximumTracedRecords(),
+ sparkContext.getDataTracer(stageSpec.getName()).isEnabled()));
this.sourceFactory = sourceFactory;
this.isPreviewEnabled = stageSpec.isPreviewEnabled(sparkContext);
}
+ @Override
+ public void setErrorDetailsProvider(ErrorDetailsProviderSpec errorDetailsProviderSpec) {
+ this.errorDetailsProviderSpec = errorDetailsProviderSpec;
+ }
+
@Override
public void setInput(Input input) {
Input trackableInput = input;
@@ -60,6 +70,10 @@ public void setInput(Input input) {
Map conf = new HashMap<>(provider.getInputFormatConfiguration());
conf.put(StageTrackingInputFormat.DELEGATE_CLASS_NAME, provider.getInputFormatClassName());
conf.put(StageTrackingInputFormat.WRAPPED_STAGE_NAME, getStageName());
+ if (errorDetailsProviderSpec != null) {
+ conf.put(ErrorDetails.ERROR_DETAILS_PROVIDER_CLASS_NAME_KEY,
+ errorDetailsProviderSpec.getClassName());
+ }
provider = new BasicInputFormatProvider(StageTrackingInputFormat.class.getName(), conf);
trackableInput = Input.of(trackableInput.getName(), provider).alias(trackableInput.getAlias());
}
diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java
index d090083b0b00..444cfdcf144b 100644
--- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java
+++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java
@@ -16,8 +16,9 @@
package io.cdap.cdap.etl.spark.io;
-import io.cdap.cdap.api.exception.WrappedStageException;
+import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.batch.DelegatingInputFormat;
+import io.cdap.cdap.etl.common.ErrorDetails;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -47,28 +48,32 @@ protected String getDelegateClassNameKey() {
@Override
public List getSplits(JobContext context) {
+ Configuration conf = context.getConfiguration();
try {
- return getDelegate(context.getConfiguration()).getSplits(context);
+ return getDelegate(conf).getSplits(context);
} catch (Exception e) {
- throw new WrappedStageException(e, getStageName(context.getConfiguration()));
+ throw ErrorDetails.handleException(e, getStageName(conf),
+ ErrorDetails.getErrorDetailsProvider(conf), ErrorPhase.SPLITTING);
}
}
@Override
public RecordReader createRecordReader(InputSplit split,
TaskAttemptContext context) {
+ Configuration conf = context.getConfiguration();
try {
// Spark already tracking metrics for file based input, hence we don't need to track again.
if (split instanceof FileSplit || split instanceof CombineFileSplit) {
return new StageTrackingRecordReader<>(super.createRecordReader(split, context),
- getStageName(context.getConfiguration()));
+ getStageName(conf), ErrorDetails.getErrorDetailsProvider(conf));
}
return new StageTrackingRecordReader<>(new TrackingRecordReader<>(
super.createRecordReader(split, new TrackingTaskAttemptContext(context))),
- getStageName(context.getConfiguration()));
+ getStageName(conf), ErrorDetails.getErrorDetailsProvider(conf));
} catch (Exception e) {
- throw new WrappedStageException(e, getStageName(context.getConfiguration()));
+ throw ErrorDetails.handleException(e, getStageName(conf),
+ ErrorDetails.getErrorDetailsProvider(conf), ErrorPhase.READING);
}
}
diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java
index 8fe6cf166eae..511bfdb54bda 100644
--- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java
+++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java
@@ -16,7 +16,9 @@
package io.cdap.cdap.etl.spark.io;
-import io.cdap.cdap.api.exception.WrappedStageException;
+import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
+import io.cdap.cdap.etl.api.exception.ErrorPhase;
+import io.cdap.cdap.etl.common.ErrorDetails;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -38,10 +40,13 @@ public class StageTrackingOutputCommitter extends OutputCommitter {
private final OutputCommitter delegate;
private final String stageName;
+ private final ErrorDetailsProvider errorDetailsProvider;
- public StageTrackingOutputCommitter(OutputCommitter delegate, String stageName) {
+ public StageTrackingOutputCommitter(OutputCommitter delegate, String stageName,
+ ErrorDetailsProvider errorDetailsProvider) {
this.delegate = delegate;
this.stageName = stageName;
+ this.errorDetailsProvider = errorDetailsProvider;
}
@Override
@@ -49,7 +54,8 @@ public void setupJob(JobContext jobContext) {
try {
delegate.setupJob(jobContext);
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.COMMITTING);
}
}
@@ -58,7 +64,8 @@ public void setupTask(TaskAttemptContext taskAttemptContext) {
try {
delegate.setupTask(taskAttemptContext);
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.COMMITTING);
}
}
@@ -67,7 +74,8 @@ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
try {
return delegate.needsTaskCommit(taskAttemptContext);
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.COMMITTING);
}
}
@@ -76,7 +84,8 @@ public void commitTask(TaskAttemptContext taskAttemptContext) {
try {
delegate.commitTask(taskAttemptContext);
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.COMMITTING);
}
}
@@ -85,7 +94,8 @@ public void abortTask(TaskAttemptContext taskAttemptContext) {
try {
delegate.abortTask(taskAttemptContext);
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.COMMITTING);
}
}
@@ -99,7 +109,8 @@ public void recoverTask(TaskAttemptContext taskContext) {
try {
delegate.recoverTask(taskContext);
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.COMMITTING);
}
}
}
diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java
index 9d95c9750188..350227b33cbe 100644
--- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java
+++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java
@@ -16,8 +16,9 @@
package io.cdap.cdap.etl.spark.io;
-import io.cdap.cdap.api.exception.WrappedStageException;
+import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.batch.DelegatingOutputFormat;
+import io.cdap.cdap.etl.common.ErrorDetails;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -39,47 +40,53 @@ public class StageTrackingOutputFormat extends DelegatingOutputFormat getRecordWriter(TaskAttemptContext context) {
OutputFormat delegate = getDelegate(context.getConfiguration());
+ Configuration conf = context.getConfiguration();
try {
// Spark already emitting bytes written metrics for file base output,
// hence we don't want to double count
if (delegate instanceof FileOutputFormat) {
return new StageTrackingRecordWriter<>(delegate.getRecordWriter(context),
- getStageName(context.getConfiguration()));
+ getStageName(conf), ErrorDetails.getErrorDetailsProvider(conf));
}
return new StageTrackingRecordWriter<>(
new TrackingRecordWriter(delegate.getRecordWriter(new TrackingTaskAttemptContext(context))),
- getStageName(context.getConfiguration()));
+ getStageName(conf), ErrorDetails.getErrorDetailsProvider(conf));
} catch (Exception e) {
- throw new WrappedStageException(e, getStageName(context.getConfiguration()));
+ throw ErrorDetails.handleException(e, getStageName(conf),
+ ErrorDetails.getErrorDetailsProvider(conf), ErrorPhase.WRITING);
}
}
@Override
public void checkOutputSpecs(JobContext context) {
+ Configuration conf = context.getConfiguration();
try {
- getDelegate(context.getConfiguration()).checkOutputSpecs(context);
+ getDelegate(conf).checkOutputSpecs(context);
} catch (Exception e) {
- throw new WrappedStageException(e, getStageName(context.getConfiguration()));
+ throw ErrorDetails.handleException(e, getStageName(conf),
+ ErrorDetails.getErrorDetailsProvider(conf), ErrorPhase.VALIDATING_OUTPUT_SPECS);
}
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
OutputFormat delegate = getDelegate(context.getConfiguration());
+ Configuration conf = context.getConfiguration();
try {
// Spark already emitting bytes written metrics for file base output,
// hence we don't want to double count
if (delegate instanceof FileOutputFormat) {
return new StageTrackingOutputCommitter(delegate.getOutputCommitter(context),
- getStageName(context.getConfiguration()));
+ getStageName(conf), ErrorDetails.getErrorDetailsProvider(conf));
}
return new StageTrackingOutputCommitter(new TrackingOutputCommitter(
delegate.getOutputCommitter(new TrackingTaskAttemptContext(context))),
- getStageName(context.getConfiguration()));
+ getStageName(conf), ErrorDetails.getErrorDetailsProvider(conf));
} catch (Exception e) {
- throw new WrappedStageException(e, getStageName(context.getConfiguration()));
+ throw ErrorDetails.handleException(e, getStageName(conf),
+ ErrorDetails.getErrorDetailsProvider(conf), ErrorPhase.COMMITTING);
}
}
diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java
index f5232103f962..19004f383880 100644
--- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java
+++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java
@@ -16,7 +16,9 @@
package io.cdap.cdap.etl.spark.io;
-import io.cdap.cdap.api.exception.WrappedStageException;
+import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
+import io.cdap.cdap.etl.api.exception.ErrorPhase;
+import io.cdap.cdap.etl.common.ErrorDetails;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -42,10 +44,13 @@ public class StageTrackingRecordReader extends RecordReader {
private final RecordReader delegate;
private final String stageName;
+ private final ErrorDetailsProvider errorDetailsProvider;
- public StageTrackingRecordReader(RecordReader delegate, String stageName) {
+ public StageTrackingRecordReader(RecordReader delegate, String stageName,
+ ErrorDetailsProvider errorDetailsProvider) {
this.delegate = delegate;
this.stageName = stageName;
+ this.errorDetailsProvider = errorDetailsProvider;
}
@Override
@@ -53,7 +58,8 @@ public void initialize(InputSplit split, TaskAttemptContext context) {
try {
delegate.initialize(split, new TrackingTaskAttemptContext(context));
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.READING);
}
}
@@ -62,7 +68,8 @@ public boolean nextKeyValue() {
try {
return delegate.nextKeyValue();
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.READING);
}
}
@@ -71,7 +78,8 @@ public K getCurrentKey() {
try {
return delegate.getCurrentKey();
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.READING);
}
}
@@ -80,7 +88,8 @@ public V getCurrentValue() {
try {
return delegate.getCurrentValue();
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.READING);
}
}
@@ -89,7 +98,8 @@ public float getProgress() {
try {
return delegate.getProgress();
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.READING);
}
}
@@ -98,7 +108,8 @@ public void close() {
try {
delegate.close();
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.READING);
}
}
}
diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java
index 377f93db604d..63aaddf5aa41 100644
--- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java
+++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java
@@ -16,7 +16,9 @@
package io.cdap.cdap.etl.spark.io;
-import io.cdap.cdap.api.exception.WrappedStageException;
+import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
+import io.cdap.cdap.etl.api.exception.ErrorPhase;
+import io.cdap.cdap.etl.common.ErrorDetails;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -38,10 +40,13 @@
public class StageTrackingRecordWriter extends RecordWriter {
private final RecordWriter delegate;
private final String stageName;
+ private final ErrorDetailsProvider errorDetailsProvider;
- public StageTrackingRecordWriter(RecordWriter delegate, String stageName) {
+ public StageTrackingRecordWriter(RecordWriter delegate, String stageName,
+ ErrorDetailsProvider errorDetailsProvider) {
this.delegate = delegate;
this.stageName = stageName;
+ this.errorDetailsProvider = errorDetailsProvider;
}
@Override
@@ -49,7 +54,8 @@ public void write(K k, V v) {
try {
delegate.write(k, v);
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.WRITING);
}
}
@@ -58,7 +64,8 @@ public void close(TaskAttemptContext taskAttemptContext) {
try {
delegate.close(taskAttemptContext);
} catch (Exception e) {
- throw new WrappedStageException(e, stageName);
+ throw ErrorDetails.handleException(e, stageName, errorDetailsProvider,
+ ErrorPhase.WRITING);
}
}
}