diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java index 44023326a0cf..f9b43b684666 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java @@ -187,6 +187,17 @@ public void flush() throws IOException { public void close() throws IOException { stream.close(); this.closed = true; + // {@link org.apache.hadoop.fs.s3a.S3ABlockOutputStream#close()} calls {@link + // org.apache.hadoop.fs.s3a.S3ABlockOutputStream#putObject()} + // which doesn't throw an exception when interrupted. + // Need to check the interrupted flag to detect failed object upload + // and propagate the error up. + if (Thread.interrupted() + && "org.apache.hadoop.fs.s3a.S3ABlockOutputStream" + .equals(stream.getWrappedStream().getClass().getName())) { + throw new IOException( + "S3ABlockOutputStream failed to upload object after stream was closed"); + } } @SuppressWarnings("checkstyle:NoFinalizer") diff --git a/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java new file mode 100644 index 000000000000..63bddf7b79c2 --- /dev/null +++ b/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** mock class for testing hadoop s3a writer */ +public class S3ABlockOutputStream extends OutputStream { + public ExecutorService mockCloseService; + public Future mockUploadOnClose; + + public S3ABlockOutputStream() { + mockCloseService = Executors.newSingleThreadExecutor(); + } + + @Override + public void write(int b) throws IOException { + throw new IOException("mocked class, do not use"); + } + + @Override + public void close() throws IOException { + try { + mockUploadOnClose = + mockCloseService.submit( + () -> { + try { + Thread.sleep(30 * 1000); + } catch (InterruptedException e) { + // ignore + } + }); + mockUploadOnClose.get(); + } catch (CancellationException | InterruptedException e) { + // mock interrupt in S3ABlockOutputStream#putObject + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new RuntimeException(e); + } + super.close(); + } + + public void interruptClose() { + mockUploadOnClose.cancel(true); + } +} diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java new file mode 100644 index 000000000000..09b478e4a6c1 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop; + +import java.io.IOException; +import java.util.concurrent.Executors; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.s3a.S3ABlockOutputStream; +import org.apache.iceberg.io.PositionOutputStream; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestHadoopStreams { + + @Test + void closeShouldThrowIOExceptionWhenInterrupted() throws Exception { + + S3ABlockOutputStream s3ABlockOutputStream = new S3ABlockOutputStream(); + FSDataOutputStream fsDataOutputStream = new FSDataOutputStream(s3ABlockOutputStream, null); + PositionOutputStream wrap = HadoopStreams.wrap(fsDataOutputStream); + // interrupt mock upload on close after a delay + Executors.newSingleThreadExecutor() + .execute( + () -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + s3ABlockOutputStream.interruptClose(); + }); + + Assertions.assertThatThrownBy(wrap::close) + .isInstanceOf(IOException.class) + .hasMessage("S3ABlockOutputStream failed to upload object after stream was closed"); + } +}