Skip to content

Commit

Permalink
CDAP-21061: wrap all plugin methods to throw WrappedStageException
Browse files Browse the repository at this point in the history
  • Loading branch information
itsankit-google committed Aug 28, 2024
1 parent f5f8cb5 commit b7ab396
Show file tree
Hide file tree
Showing 17 changed files with 496 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,15 @@ public WrappedStageException(Throwable cause, String stageName) {
public String getStageName() {
return stageName;
}

/**
* Returns the detail message string of this exception.
*
* @return the detail message as a {@String}.
*/
@Override
public String getMessage() {
return stageName != null ? String.format("Stage '%s' encountered : %s", stageName,
super.getMessage()) : super.getMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public interface MultiInputStageConfigurer {
*/
List<String> getInputStages();

/**
* Returns the name of the stage.
*
* @return the stage name as a {@String}.
*/
String getStageName();

/**

Check warning on line 54 in cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/MultiInputStageConfigurer.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.SummaryJavadocCheck

First sentence of Javadoc is missing an ending period.
* set output schema for this stage, or null if its unknown
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public interface MultiOutputStageConfigurer {
*/
void setOutputSchemas(Map<String, Schema> outputSchemas);

/**
* Returns the name of the stage.
*
* @return the stage name as a {@String}.
*/
String getStageName();

/**
* Returns a failure collector for the stage.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public interface StageConfigurer {
*/
void setErrorSchema(@Nullable Schema errorSchema);

/**
* Returns the name of the stage.
*
* @return the stage name as a {@String}.
*/
String getStageName();

/**
* Returns a failure collector for the stage.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ public void setErrorSchema(@Nullable Schema errorSchema) {
errorSchemaSet = true;
}

/**
* Returns the name of the stage.
*
* @return the stage name as a {@String}.
*/
@Override
public String getStageName() {
return stageName;
}

@Override
public FailureCollector getFailureCollector() {
if (collector == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,13 @@ public <T> T call(Callable<T> callable) throws Exception {
public static Caller wrap(Caller delegate, String stageName) {
return new StageLoggingCaller(delegate, stageName);
}

/**
* Returns the name of the stage.
*
* @return the stage name as a {@String}.
*/
public String getStageName() {
return stageName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.common.plugin;

import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
Expand All @@ -38,16 +39,24 @@ public WrappedAction(Action action, Caller caller) {
@Override
public void configurePipeline(final PipelineConfigurer pipelineConfigurer) {
caller.callUnchecked((Callable<Void>) () -> {
action.configurePipeline(pipelineConfigurer);
return null;
try {
action.configurePipeline(pipelineConfigurer);
return null;
} catch (Exception e) {
throw new WrappedStageException(e, pipelineConfigurer.getStageConfigurer().getStageName());
}
});
}

@Override
public void run(final ActionContext context) throws Exception {
caller.call((Callable<Void>) () -> {
action.run(context);
return null;
try {
action.run(context);
return null;
} catch (Exception e) {
throw new WrappedStageException(e, context.getStageName());
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.common.plugin;

import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchAggregator;
Expand Down Expand Up @@ -52,24 +53,39 @@ public WrappedBatchAggregator(BatchAggregator<GROUP_KEY, GROUP_VALUE, OUT> aggre
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
caller.callUnchecked((Callable<Void>) () -> {
aggregator.configurePipeline(pipelineConfigurer);
return null;
try {
aggregator.configurePipeline(pipelineConfigurer);
return null;
} catch (Exception e) {
throw new WrappedStageException(e, pipelineConfigurer.getStageConfigurer().getStageName());
}
});
}

@Override
public void initialize(BatchRuntimeContext context) throws Exception {
caller.call((Callable<Void>) () -> {
aggregator.initialize(context);
return null;
try {
aggregator.initialize(context);
return null;
} catch (Exception e) {
throw new WrappedStageException(e, context.getStageName());
}
});
}

@Override
public void destroy() {
caller.callUnchecked((Callable<Void>) () -> {
aggregator.destroy();
return null;
try {
aggregator.destroy();
return null;
} catch (Exception e) {
if (caller instanceof StageLoggingCaller) {
throw new WrappedStageException(e, ((StageLoggingCaller) caller).getStageName());
}
throw e;
}
});
}

Expand All @@ -78,16 +94,24 @@ public void prepareRun(BatchAggregatorContext context) throws Exception {
context.setGroupKeyClass(TypeChecker.getGroupKeyClass(aggregator));
context.setGroupValueClass(TypeChecker.getGroupValueClass(aggregator));
caller.call((Callable<Void>) () -> {
aggregator.prepareRun(context);
return null;
try {
aggregator.prepareRun(context);
return null;
} catch (Exception e) {
throw new WrappedStageException(e, context.getStageName());
}
});
}

@Override
public void onRunFinish(boolean succeeded, BatchAggregatorContext context) {
caller.callUnchecked((Callable<Void>) () -> {
aggregator.onRunFinish(succeeded, context);
return null;
try {
aggregator.onRunFinish(succeeded, context);
return null;
} catch (Exception e) {
throw new WrappedStageException(e, context.getStageName());
}
});
}

Expand All @@ -96,8 +120,15 @@ public void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter) throws E
operationTimer.start();
try {
caller.call((Callable<Void>) () -> {
aggregator.groupBy(groupValue, new UntimedEmitter<>(emitter, operationTimer));
return null;
try {
aggregator.groupBy(groupValue, new UntimedEmitter<>(emitter, operationTimer));
return null;
} catch (Exception e) {
if (caller instanceof StageLoggingCaller) {
throw new WrappedStageException(e, ((StageLoggingCaller) caller).getStageName());
}
throw e;
}
});
} finally {
operationTimer.reset();
Expand All @@ -110,8 +141,16 @@ public void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,
operationTimer.start();
try {
caller.call((Callable<Void>) () -> {
aggregator.aggregate(groupKey, groupValues, new UntimedEmitter<>(emitter, operationTimer));
return null;
try {
aggregator.aggregate(groupKey, groupValues,
new UntimedEmitter<>(emitter, operationTimer));
return null;
} catch (Exception e) {
if (caller instanceof StageLoggingCaller) {
throw new WrappedStageException(e, ((StageLoggingCaller) caller).getStageName());
}
throw e;
}
});
} finally {
operationTimer.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.common.plugin;

import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.api.JoinConfig;
import io.cdap.cdap.etl.api.JoinElement;
import io.cdap.cdap.etl.api.MultiInputPipelineConfigurer;
Expand Down Expand Up @@ -52,8 +53,13 @@ public WrappedBatchJoiner(BatchJoiner<JOIN_KEY, INPUT_RECORD, OUT> joiner, Calle
@Override
public void configurePipeline(MultiInputPipelineConfigurer multiInputPipelineConfigurer) {
caller.callUnchecked((Callable<Void>) () -> {
joiner.configurePipeline(multiInputPipelineConfigurer);
return null;
try {
joiner.configurePipeline(multiInputPipelineConfigurer);
return null;
} catch (Exception e) {
throw new WrappedStageException(e,
multiInputPipelineConfigurer.getMultiInputStageConfigurer().getStageName());
}
});
}

Expand All @@ -62,32 +68,51 @@ public void prepareRun(BatchJoinerContext context) throws Exception {
context.setJoinKeyClass(TypeChecker.getJoinKeyClass(joiner));
context.setJoinInputRecordClass(TypeChecker.getJoinInputRecordClass(joiner));
caller.call((Callable<Void>) () -> {
joiner.prepareRun(context);
return null;
try {
joiner.prepareRun(context);
return null;
} catch (Exception e) {
throw new WrappedStageException(e, context.getStageName());
}
});
}

@Override
public void initialize(BatchJoinerRuntimeContext context) throws Exception {
caller.call((Callable<Void>) () -> {
joiner.initialize(context);
return null;
try {
joiner.initialize(context);
return null;
} catch (Exception e) {
throw new WrappedStageException(e, context.getStageName());
}
});
}

@Override
public void destroy() {
caller.callUnchecked((Callable<Void>) () -> {
joiner.destroy();
return null;
try {
joiner.destroy();
return null;
} catch (Exception e) {
if (caller instanceof StageLoggingCaller) {
throw new WrappedStageException(e, ((StageLoggingCaller) caller).getStageName());
}
throw e;
}
});
}

@Override
public void onRunFinish(boolean succeeded, BatchJoinerContext context) {
caller.callUnchecked((Callable<Void>) () -> {
joiner.onRunFinish(succeeded, context);
return null;
try {
joiner.onRunFinish(succeeded, context);
return null;
} catch (Exception e) {
throw new WrappedStageException(e, context.getStageName());
}
});
}

Expand All @@ -96,7 +121,13 @@ public void onRunFinish(boolean succeeded, BatchJoinerContext context) {
public JOIN_KEY joinOn(String stageName, INPUT_RECORD inputRecord) throws Exception {
operationTimer.start();
try {
return caller.call(() -> joiner.joinOn(stageName, inputRecord));
return caller.call((Callable<? extends JOIN_KEY>) () -> {
try {
return joiner.joinOn(stageName, inputRecord);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
}
});
} finally {
operationTimer.reset();
}
Expand All @@ -107,23 +138,47 @@ public Collection<JOIN_KEY> getJoinKeys(String stageName, INPUT_RECORD inputReco
throws Exception {
operationTimer.start();
try {
return caller.call(() -> joiner.getJoinKeys(stageName, inputRecord));
return caller.call((Callable<? extends Collection<JOIN_KEY>>) () -> {
try {
return joiner.getJoinKeys(stageName, inputRecord);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
}
});
} finally {
operationTimer.reset();
}
}

@Override
public JoinConfig getJoinConfig() throws Exception {
return caller.call(joiner::getJoinConfig);
return caller.call((Callable<? extends JoinConfig>) () -> {
try {
return joiner.getJoinConfig();
} catch (Exception e) {
if (caller instanceof StageLoggingCaller) {
throw new WrappedStageException(e, ((StageLoggingCaller) caller).getStageName());
}
throw e;
}
});
}

@Override
public OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult)
throws Exception {
operationTimer.start();
try {
return caller.call(() -> joiner.merge(joinKey, joinResult));
return caller.call((Callable<? extends OUT>) () -> {
try {
return joiner.merge(joinKey, joinResult);
} catch (Exception e) {
if (caller instanceof StageLoggingCaller) {
throw new WrappedStageException(e, ((StageLoggingCaller) caller).getStageName());
}
throw e;
}
});
} finally {
operationTimer.reset();
}
Expand Down
Loading

0 comments on commit b7ab396

Please sign in to comment.