From d9a12baf6ac58fb71d21da08c22d405d9cc044ba Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Mon, 21 Oct 2024 17:20:57 +0000 Subject: [PATCH] Delegate all remaining methods of output committer --- .../io/StageTrackingOutputCommitter.java | 54 +++++++++++++++++++ .../etl/spark/io/TrackingOutputCommitter.java | 11 ++++ 2 files changed, 65 insertions(+) diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java index 511bfdb54bda..3c4ba7cf2352 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java @@ -19,7 +19,9 @@ import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.cdap.etl.common.ErrorDetails; +import java.io.IOException; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -59,6 +61,37 @@ public void setupJob(JobContext jobContext) { } } + @Override + @Deprecated + public void cleanupJob(JobContext jobContext) throws IOException { + try { + delegate.cleanupJob(jobContext); + } catch (Exception e) { + throw ErrorDetails.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.COMMITTING); + } + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + try { + delegate.commitJob(jobContext); + } catch (Exception e) { + throw ErrorDetails.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.COMMITTING); + } + } + + @Override + public void abortJob(JobContext jobContext, State state) throws IOException { + try { + delegate.abortJob(jobContext, state); + } catch (Exception e) { + throw ErrorDetails.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.COMMITTING); + } + } + @Override public void setupTask(TaskAttemptContext taskAttemptContext) { try { @@ -100,10 +133,31 @@ public void abortTask(TaskAttemptContext taskAttemptContext) { } @Override + @Deprecated public boolean isRecoverySupported() { return delegate.isRecoverySupported(); } + @Override + public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException { + try { + return delegate.isCommitJobRepeatable(jobContext); + } catch (Exception e) { + throw ErrorDetails.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.COMMITTING); + } + } + + @Override + public boolean isRecoverySupported(JobContext jobContext) throws IOException { + try { + return delegate.isRecoverySupported(jobContext); + } catch (Exception e) { + throw ErrorDetails.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.COMMITTING); + } + } + @Override public void recoverTask(TaskAttemptContext taskContext) { try { diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingOutputCommitter.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingOutputCommitter.java index 971a72e77dd9..fd9377a80871 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingOutputCommitter.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingOutputCommitter.java @@ -77,10 +77,21 @@ public void abortTask(TaskAttemptContext taskContext) throws IOException { } @Override + @Deprecated public boolean isRecoverySupported() { return delegate.isRecoverySupported(); } + @Override + public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException { + return delegate.isCommitJobRepeatable(jobContext); + } + + @Override + public boolean isRecoverySupported(JobContext jobContext) throws IOException { + return delegate.isRecoverySupported(jobContext); + } + @Override public void recoverTask(TaskAttemptContext taskContext) throws IOException { delegate.recoverTask(new TrackingTaskAttemptContext(taskContext));