-
Notifications
You must be signed in to change notification settings - Fork 100
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Updated output formats to output BigDecimal fields as numeric values …
…instead of a byte representation.
- Loading branch information
Showing
6 changed files
with
382 additions
and
5 deletions.
There are no files selected for viewing
80 changes: 80 additions & 0 deletions
80
.../src/main/java/io/cdap/cdap/format/io/BigDecimalAwareJsonStructuredRecordDatumWriter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Copyright © 2022 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.cdap.format.io; | ||
|
||
import com.google.gson.stream.JsonWriter; | ||
import io.cdap.cdap.api.common.Bytes; | ||
import io.cdap.cdap.api.data.schema.Schema; | ||
import io.cdap.cdap.common.io.Encoder; | ||
|
||
import java.io.IOException; | ||
import java.math.BigDecimal; | ||
import java.math.BigInteger; | ||
import java.nio.ByteBuffer; | ||
|
||
/** | ||
* Custom Datum Writer for Structured Records, this class writes BigDecimal values as numbers instead of byte arrays in | ||
* the JSON output. | ||
*/ | ||
public class BigDecimalAwareJsonStructuredRecordDatumWriter extends JsonStructuredRecordDatumWriter { | ||
@Override | ||
protected void encode(Encoder encoder, Schema schema, Object value) throws IOException { | ||
Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema; | ||
Schema.LogicalType logicalType = nonNullableSchema.getLogicalType(); | ||
|
||
if (value != null && logicalType == Schema.LogicalType.DECIMAL) { | ||
BigDecimal bdValue = fromObject(value, nonNullableSchema); | ||
getJsonWriter(encoder).value(bdValue); | ||
return; | ||
} | ||
|
||
super.encode(encoder, schema, value); | ||
} | ||
|
||
/** | ||
* Extract a BigDecimal value from a supplied object | ||
* @param value object to convert into BigDecimal | ||
* @param logicalTypeSchema logical type schema for this field | ||
* @return value converted ingo a BigDecimal. | ||
*/ | ||
protected BigDecimal fromObject(Object value, Schema logicalTypeSchema) { | ||
// Return BigDecimal as is | ||
if (value instanceof BigDecimal) { | ||
return (BigDecimal) value; | ||
} | ||
|
||
// Handle the value as a byte buffer | ||
int scale = logicalTypeSchema.getScale(); | ||
if (value instanceof ByteBuffer) { | ||
return new BigDecimal(new BigInteger(Bytes.toBytes((ByteBuffer) value)), scale); | ||
} | ||
|
||
// Handle the BigDecimal value | ||
try { | ||
return new BigDecimal(new BigInteger((byte[]) value), scale); | ||
} catch (ClassCastException e) { | ||
throw new ClassCastException(String.format("Field '%s' is expected to be a decimal, but is a %s.", | ||
logicalTypeSchema.getDisplayName(), | ||
value.getClass().getSimpleName())); | ||
} | ||
} | ||
|
||
private JsonWriter getJsonWriter(Encoder encoder) { | ||
// Type already checked in the encode method, hence assuming the casting is fine. | ||
return ((JsonEncoder) encoder).getJsonWriter(); | ||
} | ||
} |
83 changes: 83 additions & 0 deletions
83
format-common/src/main/java/io/cdap/cdap/format/io/BigDecimalAwareJsonWriter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright © 2022 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.cdap.format.io; | ||
|
||
import com.google.gson.stream.JsonWriter; | ||
|
||
import java.io.IOException; | ||
import java.io.Writer; | ||
import java.math.BigDecimal; | ||
|
||
/** | ||
* JsonWriter instance which handles writing BigDecimal fields. | ||
*/ | ||
public class BigDecimalAwareJsonWriter extends JsonWriter { | ||
public BigDecimalAwareJsonWriter(Writer out) { | ||
super(out); | ||
} | ||
|
||
@Override | ||
public JsonWriter value(Number value) throws IOException { | ||
if (value == null) { | ||
return this.nullValue(); | ||
} | ||
|
||
// Wrap BigDecimal fields in a wrapper which handles the conversion to String. | ||
if (value instanceof BigDecimal) { | ||
value = new BigDecimalWrapper((BigDecimal) value); | ||
} | ||
|
||
super.value(value); | ||
return this; | ||
} | ||
|
||
/** | ||
* Wrapper used to ensure that BigDecimals are generated as plain strings. | ||
*/ | ||
private static class BigDecimalWrapper extends Number { | ||
BigDecimal wrapped; | ||
|
||
protected BigDecimalWrapper(BigDecimal wrapped) { | ||
this.wrapped = wrapped; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return wrapped.toPlainString(); | ||
} | ||
|
||
@Override | ||
public int intValue() { | ||
return wrapped.intValue(); | ||
} | ||
|
||
@Override | ||
public long longValue() { | ||
return wrapped.longValue(); | ||
} | ||
|
||
@Override | ||
public float floatValue() { | ||
return wrapped.floatValue(); | ||
} | ||
|
||
@Override | ||
public double doubleValue() { | ||
return wrapped.doubleValue(); | ||
} | ||
} | ||
} |
96 changes: 96 additions & 0 deletions
96
...n/src/main/java/io/cdap/plugin/format/BigDecimalAwareStructuredRecordStringConverter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/* | ||
* Copyright © 2022 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.plugin.format; | ||
|
||
import com.google.gson.stream.JsonWriter; | ||
import io.cdap.cdap.api.data.format.StructuredRecord; | ||
import io.cdap.cdap.api.data.schema.Schema; | ||
import io.cdap.cdap.format.io.BigDecimalAwareJsonStructuredRecordDatumWriter; | ||
import io.cdap.cdap.format.io.BigDecimalAwareJsonWriter; | ||
import io.cdap.cdap.format.io.JsonEncoder; | ||
|
||
import java.io.IOException; | ||
import java.io.StringWriter; | ||
import java.math.BigDecimal; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Structured record converted that handles writing decimal files as numbers in the output (instead of int arrays). | ||
*/ | ||
public class BigDecimalAwareStructuredRecordStringConverter { | ||
private static final BigDecimalAwareJsonStructuredRecordDatumWriter JSON_DATUM_WRITER = | ||
new BigDecimalAwareJsonStructuredRecordDatumWriter(); | ||
|
||
/** | ||
* Converts a {@link StructuredRecord} to a json string. | ||
*/ | ||
public static String toJsonString(StructuredRecord record) throws IOException { | ||
StringWriter strWriter = new StringWriter(); | ||
try (JsonWriter writer = new BigDecimalAwareJsonWriter(strWriter)) { | ||
JSON_DATUM_WRITER.encode(record, new JsonEncoder(writer)); | ||
return strWriter.toString(); | ||
} | ||
} | ||
|
||
/** | ||
* Converts a {@link StructuredRecord} to a delimited string. | ||
*/ | ||
public static String toDelimitedString(final StructuredRecord record, String delimiter) { | ||
return record.getSchema().getFields().stream() | ||
.map(f -> mapField(record, f)) | ||
.collect(Collectors.joining(delimiter)); | ||
} | ||
|
||
/** | ||
* Get the string representation for a given record field. BigDecimals are printed as plain strings. | ||
* @param record record to process | ||
* @param field field to extract | ||
* @return String representing the value for this field. | ||
*/ | ||
private static String mapField(StructuredRecord record, Schema.Field field) { | ||
String fieldName = field.getName(); | ||
Object value = record.get(fieldName); | ||
|
||
// Return null value as empty string. | ||
if (value == null) { | ||
return ""; | ||
} | ||
|
||
// Get the field schema. | ||
Schema fieldSchema = field.getSchema(); | ||
if (fieldSchema.isNullable()) { | ||
fieldSchema = fieldSchema.getNonNullable(); | ||
} | ||
|
||
// Write decimal values as decimal strings. | ||
if (fieldSchema.getLogicalType() != null && fieldSchema.getLogicalType() == Schema.LogicalType.DECIMAL) { | ||
BigDecimal decimalValue = record.getDecimal(fieldName); | ||
|
||
// Throw exception if the field is expected tu be decimal, but it could not be processed as such. | ||
if (decimalValue == null) { | ||
throw new IllegalArgumentException("Invalid schema for field " + fieldName + ". Decimal was expected."); | ||
} | ||
return decimalValue.toPlainString(); | ||
} | ||
|
||
return value.toString(); | ||
} | ||
|
||
private BigDecimalAwareStructuredRecordStringConverter() { | ||
//inaccessible constructor for static class | ||
} | ||
} |
116 changes: 116 additions & 0 deletions
116
...c/test/java/io/cdap/plugin/format/BigDecimalAwareStructuredRecordStringConverterTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/* | ||
* Copyright © 2022 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.plugin.format; | ||
|
||
import io.cdap.cdap.api.data.format.StructuredRecord; | ||
import io.cdap.cdap.api.data.schema.Schema; | ||
import org.junit.Assert; | ||
import org.junit.Test; | ||
|
||
import java.io.IOException; | ||
import java.math.BigDecimal; | ||
import java.math.MathContext; | ||
|
||
public class BigDecimalAwareStructuredRecordStringConverterTest { | ||
private static final Schema SCHEMA = Schema.recordOf( | ||
"rec", | ||
Schema.Field.of("i", Schema.of(Schema.Type.INT)), | ||
Schema.Field.of("f", Schema.of(Schema.Type.FLOAT)), | ||
Schema.Field.of("bd", Schema.nullableOf(Schema.decimalOf(32, 7))), | ||
Schema.Field.of("d", Schema.of(Schema.Type.DOUBLE)), | ||
Schema.Field.of("l", Schema.of(Schema.Type.LONG))); | ||
|
||
private static final BigDecimal BD1 = | ||
new BigDecimal("12398321.8127312", MathContext.DECIMAL128).setScale(7); | ||
|
||
private static final BigDecimal BD2 = | ||
new BigDecimal("1000000000000000.0", MathContext.DECIMAL128).setScale(7); | ||
|
||
private static final StructuredRecord RECORD1 = StructuredRecord.builder(SCHEMA) | ||
.set("i", 1) | ||
.set("f", 256.1f) | ||
.setDecimal("bd", BD1) | ||
.set("d", 12345.45678d) | ||
.set("l", 123456789L) | ||
.build(); | ||
|
||
private static final StructuredRecord RECORD2 = StructuredRecord.builder(SCHEMA) | ||
.set("i", 1) | ||
.set("f", 256.1f) | ||
.setDecimal("bd", BD2) | ||
.set("d", 12345.45678d) | ||
.set("l", 123456789L) | ||
.build(); | ||
|
||
private static final StructuredRecord RECORD3 = StructuredRecord.builder(SCHEMA) | ||
.set("i", 1) | ||
.set("f", 256.1f) | ||
.setDecimal("bd", null) | ||
.set("d", 12345.45678d) | ||
.set("l", 123456789L) | ||
.build(); | ||
|
||
@Test | ||
public void testToJsonString1() throws IOException { | ||
String output = BigDecimalAwareStructuredRecordStringConverter.toJsonString(RECORD1); | ||
Assert.assertTrue(output.startsWith("{\"i\":1,")); | ||
Assert.assertTrue(output.contains(",\"f\":256.1")); | ||
Assert.assertTrue(output.contains(",\"bd\":12398321.8127312,")); | ||
Assert.assertTrue(output.contains(",\"d\":12345.45678,")); | ||
Assert.assertTrue(output.endsWith(",\"l\":123456789}")); | ||
} | ||
|
||
@Test | ||
public void testToJsonString2() throws IOException { | ||
String output = BigDecimalAwareStructuredRecordStringConverter.toJsonString(RECORD2); | ||
Assert.assertTrue(output.startsWith("{\"i\":1,")); | ||
Assert.assertTrue(output.contains(",\"f\":256.1")); | ||
Assert.assertTrue(output.contains(",\"bd\":1000000000000000.0000000,")); | ||
Assert.assertTrue(output.contains(",\"d\":12345.45678,")); | ||
Assert.assertTrue(output.endsWith(",\"l\":123456789}")); | ||
} | ||
|
||
@Test | ||
public void testToJsonString3() throws IOException { | ||
String output = BigDecimalAwareStructuredRecordStringConverter.toJsonString(RECORD3); | ||
Assert.assertTrue(output.startsWith("{\"i\":1,")); | ||
Assert.assertTrue(output.contains(",\"f\":256.1")); | ||
Assert.assertTrue(output.contains(",\"bd\":null,")); | ||
Assert.assertTrue(output.contains(",\"d\":12345.45678,")); | ||
Assert.assertTrue(output.endsWith(",\"l\":123456789}")); | ||
} | ||
|
||
@Test | ||
public void testToDelimitedStringUsingComma() { | ||
String output = BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(RECORD1, ","); | ||
Assert.assertTrue(output.startsWith("1,")); | ||
Assert.assertTrue(output.contains(",256.1,")); | ||
Assert.assertTrue(output.contains(",12398321.8127312,")); | ||
Assert.assertTrue(output.contains(",12345.45678,")); | ||
Assert.assertTrue(output.endsWith(",123456789")); | ||
} | ||
|
||
@Test | ||
public void testToDelimitedStringUsingPipe() { | ||
String output = BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(RECORD1, "|"); | ||
Assert.assertTrue(output.startsWith("1|")); | ||
Assert.assertTrue(output.contains("|256.1|")); | ||
Assert.assertTrue(output.contains("|12398321.8127312|")); | ||
Assert.assertTrue(output.contains("|12345.45678|")); | ||
Assert.assertTrue(output.endsWith("|123456789")); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.