diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java index e3b81cea7cd1..8f32e710787e 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java @@ -22,18 +22,30 @@ import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.spark.util.KnownSizeEstimation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class provides a serializable table with a known size estimate. Spark calls its * SizeEstimator class when broadcasting variables and this can be an expensive operation, so * providing a known size estimate allows that operation to be skipped. + * + *

This class also implements AutoCloseable to avoid leaking resources upon broadcasting. + * Broadcast variables are destroyed and cleaned up on the driver and executors once they are + * garbage collected on the driver. The implementation ensures only resources used by copies of the + * main table are released. */ -public class SerializableTableWithSize extends SerializableTable implements KnownSizeEstimation { +public class SerializableTableWithSize extends SerializableTable + implements KnownSizeEstimation, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class); private static final long SIZE_ESTIMATE = 32_768L; + private final transient Object serializationMarker; + protected SerializableTableWithSize(Table table) { super(table); + this.serializationMarker = new Object(); } @Override @@ -49,6 +61,14 @@ public static Table copyOf(Table table) { } } + @Override + public void close() throws Exception { + if (serializationMarker == null) { + LOG.info("Releasing resources"); + io().close(); + } + } + public static class SerializableMetadataTableWithSize extends SerializableMetadataTable implements KnownSizeEstimation { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java index 30a167d575b1..2d046b793b7d 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java @@ -20,11 +20,17 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.util.Map; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.types.Types; @@ -63,6 +69,44 @@ public void initTable() throws IOException { this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString()); } + @Test + public void testCloseSerializableTableKryoSerialization() throws Exception { + Table spyTable = spy(table); + FileIO spyIO = spy(table.io()); + when(spyTable.io()).thenReturn(spyIO); + + Table serializableTable = SerializableTableWithSize.copyOf(spyTable); + + Table serializableTableCopy = spy(KryoHelpers.roundTripSerialize(serializableTable)); + FileIO spyFileIOCopy = spy(serializableTableCopy.io()); + when(serializableTableCopy.io()).thenReturn(spyFileIOCopy); + + ((AutoCloseable) serializableTable).close(); // mimics close on the driver + ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors + + verify(spyIO, never()).close(); + verify(spyFileIOCopy, times(1)).close(); + } + + @Test + public void testCloseSerializableTableJavaSerialization() throws Exception { + Table spyTable = spy(table); + FileIO spyIO = spy(table.io()); + when(spyTable.io()).thenReturn(spyIO); + + Table serializableTable = SerializableTableWithSize.copyOf(spyTable); + + Table serializableTableCopy = spy(TestHelpers.roundTripSerialize(serializableTable)); + FileIO spyFileIOCopy = spy(serializableTableCopy.io()); + when(serializableTableCopy.io()).thenReturn(spyFileIOCopy); + + ((AutoCloseable) serializableTable).close(); // mimics close on the driver + ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors + + verify(spyIO, never()).close(); + verify(spyFileIOCopy, times(1)).close(); + } + @Test public void testSerializableTableKryoSerialization() throws IOException { Table serializableTable = SerializableTableWithSize.copyOf(table); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java index e3b81cea7cd1..8f32e710787e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java @@ -22,18 +22,30 @@ import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.spark.util.KnownSizeEstimation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class provides a serializable table with a known size estimate. Spark calls its * SizeEstimator class when broadcasting variables and this can be an expensive operation, so * providing a known size estimate allows that operation to be skipped. + * + *

This class also implements AutoCloseable to avoid leaking resources upon broadcasting. + * Broadcast variables are destroyed and cleaned up on the driver and executors once they are + * garbage collected on the driver. The implementation ensures only resources used by copies of the + * main table are released. */ -public class SerializableTableWithSize extends SerializableTable implements KnownSizeEstimation { +public class SerializableTableWithSize extends SerializableTable + implements KnownSizeEstimation, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class); private static final long SIZE_ESTIMATE = 32_768L; + private final transient Object serializationMarker; + protected SerializableTableWithSize(Table table) { super(table); + this.serializationMarker = new Object(); } @Override @@ -49,6 +61,14 @@ public static Table copyOf(Table table) { } } + @Override + public void close() throws Exception { + if (serializationMarker == null) { + LOG.info("Releasing resources"); + io().close(); + } + } + public static class SerializableMetadataTableWithSize extends SerializableMetadataTable implements KnownSizeEstimation { diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java index b134aacac0d7..f31e0c55395c 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java @@ -20,11 +20,17 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.util.Map; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.types.Types; @@ -78,6 +84,44 @@ public void initTable() throws IOException { this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString()); } + @Test + public void testCloseSerializableTableKryoSerialization() throws Exception { + Table spyTable = spy(table); + FileIO spyIO = spy(table.io()); + when(spyTable.io()).thenReturn(spyIO); + + Table serializableTable = SerializableTableWithSize.copyOf(spyTable); + + Table serializableTableCopy = spy(KryoHelpers.roundTripSerialize(serializableTable)); + FileIO spyFileIOCopy = spy(serializableTableCopy.io()); + when(serializableTableCopy.io()).thenReturn(spyFileIOCopy); + + ((AutoCloseable) serializableTable).close(); // mimics close on the driver + ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors + + verify(spyIO, never()).close(); + verify(spyFileIOCopy, times(1)).close(); + } + + @Test + public void testCloseSerializableTableJavaSerialization() throws Exception { + Table spyTable = spy(table); + FileIO spyIO = spy(table.io()); + when(spyTable.io()).thenReturn(spyIO); + + Table serializableTable = SerializableTableWithSize.copyOf(spyTable); + + Table serializableTableCopy = spy(TestHelpers.roundTripSerialize(serializableTable)); + FileIO spyFileIOCopy = spy(serializableTableCopy.io()); + when(serializableTableCopy.io()).thenReturn(spyFileIOCopy); + + ((AutoCloseable) serializableTable).close(); // mimics close on the driver + ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors + + verify(spyIO, never()).close(); + verify(spyFileIOCopy, times(1)).close(); + } + @Test public void testSerializableTableKryoSerialization() throws IOException { Table serializableTable = SerializableTableWithSize.copyOf(table); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java index e3b81cea7cd1..8f32e710787e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java @@ -22,18 +22,30 @@ import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.spark.util.KnownSizeEstimation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class provides a serializable table with a known size estimate. Spark calls its * SizeEstimator class when broadcasting variables and this can be an expensive operation, so * providing a known size estimate allows that operation to be skipped. + * + *

This class also implements AutoCloseable to avoid leaking resources upon broadcasting. + * Broadcast variables are destroyed and cleaned up on the driver and executors once they are + * garbage collected on the driver. The implementation ensures only resources used by copies of the + * main table are released. */ -public class SerializableTableWithSize extends SerializableTable implements KnownSizeEstimation { +public class SerializableTableWithSize extends SerializableTable + implements KnownSizeEstimation, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class); private static final long SIZE_ESTIMATE = 32_768L; + private final transient Object serializationMarker; + protected SerializableTableWithSize(Table table) { super(table); + this.serializationMarker = new Object(); } @Override @@ -49,6 +61,14 @@ public static Table copyOf(Table table) { } } + @Override + public void close() throws Exception { + if (serializationMarker == null) { + LOG.info("Releasing resources"); + io().close(); + } + } + public static class SerializableMetadataTableWithSize extends SerializableMetadataTable implements KnownSizeEstimation { diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java index b134aacac0d7..f31e0c55395c 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java @@ -20,11 +20,17 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.util.Map; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.types.Types; @@ -78,6 +84,44 @@ public void initTable() throws IOException { this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString()); } + @Test + public void testCloseSerializableTableKryoSerialization() throws Exception { + Table spyTable = spy(table); + FileIO spyIO = spy(table.io()); + when(spyTable.io()).thenReturn(spyIO); + + Table serializableTable = SerializableTableWithSize.copyOf(spyTable); + + Table serializableTableCopy = spy(KryoHelpers.roundTripSerialize(serializableTable)); + FileIO spyFileIOCopy = spy(serializableTableCopy.io()); + when(serializableTableCopy.io()).thenReturn(spyFileIOCopy); + + ((AutoCloseable) serializableTable).close(); // mimics close on the driver + ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors + + verify(spyIO, never()).close(); + verify(spyFileIOCopy, times(1)).close(); + } + + @Test + public void testCloseSerializableTableJavaSerialization() throws Exception { + Table spyTable = spy(table); + FileIO spyIO = spy(table.io()); + when(spyTable.io()).thenReturn(spyIO); + + Table serializableTable = SerializableTableWithSize.copyOf(spyTable); + + Table serializableTableCopy = spy(TestHelpers.roundTripSerialize(serializableTable)); + FileIO spyFileIOCopy = spy(serializableTableCopy.io()); + when(serializableTableCopy.io()).thenReturn(spyFileIOCopy); + + ((AutoCloseable) serializableTable).close(); // mimics close on the driver + ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors + + verify(spyIO, never()).close(); + verify(spyFileIOCopy, times(1)).close(); + } + @Test public void testSerializableTableKryoSerialization() throws IOException { Table serializableTable = SerializableTableWithSize.copyOf(table); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java index e3b81cea7cd1..8f32e710787e 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java @@ -22,18 +22,30 @@ import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.spark.util.KnownSizeEstimation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class provides a serializable table with a known size estimate. Spark calls its * SizeEstimator class when broadcasting variables and this can be an expensive operation, so * providing a known size estimate allows that operation to be skipped. + * + *

This class also implements AutoCloseable to avoid leaking resources upon broadcasting. + * Broadcast variables are destroyed and cleaned up on the driver and executors once they are + * garbage collected on the driver. The implementation ensures only resources used by copies of the + * main table are released. */ -public class SerializableTableWithSize extends SerializableTable implements KnownSizeEstimation { +public class SerializableTableWithSize extends SerializableTable + implements KnownSizeEstimation, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class); private static final long SIZE_ESTIMATE = 32_768L; + private final transient Object serializationMarker; + protected SerializableTableWithSize(Table table) { super(table); + this.serializationMarker = new Object(); } @Override @@ -49,6 +61,14 @@ public static Table copyOf(Table table) { } } + @Override + public void close() throws Exception { + if (serializationMarker == null) { + LOG.info("Releasing resources"); + io().close(); + } + } + public static class SerializableMetadataTableWithSize extends SerializableMetadataTable implements KnownSizeEstimation { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java index b134aacac0d7..f31e0c55395c 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java @@ -20,11 +20,17 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.util.Map; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.types.Types; @@ -78,6 +84,44 @@ public void initTable() throws IOException { this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString()); } + @Test + public void testCloseSerializableTableKryoSerialization() throws Exception { + Table spyTable = spy(table); + FileIO spyIO = spy(table.io()); + when(spyTable.io()).thenReturn(spyIO); + + Table serializableTable = SerializableTableWithSize.copyOf(spyTable); + + Table serializableTableCopy = spy(KryoHelpers.roundTripSerialize(serializableTable)); + FileIO spyFileIOCopy = spy(serializableTableCopy.io()); + when(serializableTableCopy.io()).thenReturn(spyFileIOCopy); + + ((AutoCloseable) serializableTable).close(); // mimics close on the driver + ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors + + verify(spyIO, never()).close(); + verify(spyFileIOCopy, times(1)).close(); + } + + @Test + public void testCloseSerializableTableJavaSerialization() throws Exception { + Table spyTable = spy(table); + FileIO spyIO = spy(table.io()); + when(spyTable.io()).thenReturn(spyIO); + + Table serializableTable = SerializableTableWithSize.copyOf(spyTable); + + Table serializableTableCopy = spy(TestHelpers.roundTripSerialize(serializableTable)); + FileIO spyFileIOCopy = spy(serializableTableCopy.io()); + when(serializableTableCopy.io()).thenReturn(spyFileIOCopy); + + ((AutoCloseable) serializableTable).close(); // mimics close on the driver + ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors + + verify(spyIO, never()).close(); + verify(spyFileIOCopy, times(1)).close(); + } + @Test public void testSerializableTableKryoSerialization() throws IOException { Table serializableTable = SerializableTableWithSize.copyOf(table);