diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java index 167d0cbf011c..406a2cba4526 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java @@ -507,7 +507,9 @@ private String ensureTimestampFormat(String str) { if (result.charAt(10) == ' ') { result = result.substring(0, 10) + 'T' + result.substring(11); } - if (result.length() > 22 && result.charAt(19) == '+' && result.charAt(22) == ':') { + if (result.length() > 22 + && (result.charAt(19) == '+' || result.charAt(19) == '-') + && result.charAt(22) == ':') { result = result.substring(0, 19) + result.substring(19).replace(":", ""); } return result; diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java index fc27b542dc68..57add85a1445 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.Duration; @@ -531,23 +532,42 @@ public void testTimeConversion() { public void testTimestampWithZoneConversion() { OffsetDateTime expected = OffsetDateTime.parse("2023-05-18T11:22:33Z"); long expectedMillis = expected.toInstant().toEpochMilli(); - convertToTimestamps(expected, expectedMillis, TimestampType.withZone()); + assertTimestampConvert(expected, expectedMillis, TimestampType.withZone()); + + // zone should be respected + expected = OffsetDateTime.parse("2023-05-18T03:22:33-08:00"); + List additionalInput = + ImmutableList.of( + "2023-05-18T03:22:33-08", + "2023-05-18 03:22:33-08", + "2023-05-18T03:22:33-08:00", + "2023-05-18 03:22:33-08:00", + "2023-05-18T03:22:33-0800", + "2023-05-18 03:22:33-0800"); + assertTimestampConvert(expected, additionalInput, TimestampType.withZone()); } @Test public void testTimestampWithoutZoneConversion() { LocalDateTime expected = LocalDateTime.parse("2023-05-18T11:22:33"); long expectedMillis = expected.atZone(ZoneOffset.UTC).toInstant().toEpochMilli(); - convertToTimestamps(expected, expectedMillis, TimestampType.withoutZone()); - } + assertTimestampConvert(expected, expectedMillis, TimestampType.withoutZone()); - private void convertToTimestamps(Temporal expected, long expectedMillis, TimestampType type) { - Table table = mock(Table.class); - when(table.schema()).thenReturn(SIMPLE_SCHEMA); - RecordConverter converter = new RecordConverter(table, config); + // zone should be ignored + List additionalInput = + ImmutableList.of( + "2023-05-18T11:22:33-08", + "2023-05-18 11:22:33-08", + "2023-05-18T11:22:33-08:00", + "2023-05-18 11:22:33-08:00", + "2023-05-18T11:22:33-0800", + "2023-05-18 11:22:33-0800"); + assertTimestampConvert(expected, additionalInput, TimestampType.withoutZone()); + } + private void assertTimestampConvert(Temporal expected, long expectedMillis, TimestampType type) { List inputList = - ImmutableList.of( + Lists.newArrayList( "2023-05-18T11:22:33Z", "2023-05-18 11:22:33Z", "2023-05-18T11:22:33+00", @@ -563,6 +583,15 @@ private void convertToTimestamps(Temporal expected, long expectedMillis, Timesta OffsetDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC), LocalDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC)); + assertTimestampConvert(expected, inputList, type); + } + + private void assertTimestampConvert( + Temporal expected, List inputList, TimestampType type) { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + inputList.forEach( input -> { Temporal ts = converter.convertTimestampValue(input, type);