-
Notifications
You must be signed in to change notification settings - Fork 205
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Behavioral change to Avro codecs and schema handling (#3238)
Change the behavior of Avro-based codecs. When a schema is defined, rely on the schema rather than the incoming event. If the schema is auto-generated, then the incoming event data must continue to match. Fix Avro arrays which were only supporting arrays of strings previously. Signed-off-by: David Venable <[email protected]> (cherry picked from commit f17e833)
- Loading branch information
1 parent
6413a3d
commit c1ce807
Showing
11 changed files
with
1,200 additions
and
172 deletions.
There are no files selected for viewing
70 changes: 70 additions & 0 deletions
70
...ecs/src/main/java/org/opensearch/dataprepper/avro/AbstractAvroEventConverterTemplate.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package org.opensearch.dataprepper.avro; | ||
|
||
import org.apache.avro.Schema; | ||
import org.apache.avro.generic.GenericData; | ||
import org.apache.avro.generic.GenericRecord; | ||
import org.opensearch.dataprepper.model.sink.OutputCodecContext; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
|
||
abstract class AbstractAvroEventConverterTemplate implements AvroEventConverter { | ||
private final SchemaChooser schemaChooser; | ||
|
||
protected AbstractAvroEventConverterTemplate(final SchemaChooser schemaChooser) { | ||
this.schemaChooser = schemaChooser; | ||
} | ||
|
||
@Override | ||
public GenericRecord convertEventDataToAvro(final Schema schema, | ||
final Map<String, Object> eventData, | ||
final OutputCodecContext codecContext) { | ||
return convertEventDataToAvro(schema, eventData, codecContext, true); | ||
} | ||
|
||
private GenericRecord convertEventDataToAvro(final Schema schema, | ||
final Map<String, Object> eventData, | ||
final OutputCodecContext codecContext, | ||
boolean rootOfData) { | ||
final GenericRecord avroRecord = new GenericData.Record(schema); | ||
|
||
for (String key : getKeyNames(schema, eventData, codecContext, rootOfData)) { | ||
final Schema.Field field = schema.getField(key); | ||
if (field == null) { | ||
throw new RuntimeException("The event has a key ('" + key + "') which is not included in the schema."); | ||
} | ||
final Object value = schemaMapper(field, eventData.get(key), codecContext); | ||
avroRecord.put(key, value); | ||
} | ||
|
||
return avroRecord; | ||
} | ||
|
||
|
||
private Object schemaMapper(final Schema.Field field, final Object rawValue, OutputCodecContext codecContext) { | ||
Schema providedSchema = schemaChooser.chooseSchema(field.schema()); | ||
|
||
if (providedSchema.getType() == Schema.Type.RECORD && rawValue instanceof Map) { | ||
return convertEventDataToAvro(providedSchema, (Map<String, Object>) rawValue, codecContext, false); | ||
} else if (providedSchema.getType() == Schema.Type.ARRAY && rawValue instanceof List) { | ||
GenericData.Array<Object> avroArray = | ||
new GenericData.Array<>(((List<?>) rawValue).size(), providedSchema); | ||
for (Object element : ((List<?>) rawValue)) { | ||
avroArray.add(element); | ||
} | ||
return avroArray; | ||
} | ||
return rawValue; | ||
} | ||
|
||
/** | ||
* Template method to get key names for a given object. | ||
* | ||
* @param schema The Avro schema | ||
* @param eventData Current event data | ||
* @param codecContext The {@link OutputCodecContext} | ||
* @param rootOfData True, if this is the root of the data. False when this is nested. | ||
* @return An {@Iterable} of key names. | ||
*/ | ||
abstract Iterable<String> getKeyNames(Schema schema, Map<String, Object> eventData, OutputCodecContext codecContext, boolean rootOfData); | ||
} |
58 changes: 12 additions & 46 deletions
58
...plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AvroEventConverter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,60 +1,26 @@ | ||
package org.opensearch.dataprepper.avro; | ||
|
||
import org.apache.avro.Schema; | ||
import org.apache.avro.generic.GenericData; | ||
import org.apache.avro.generic.GenericRecord; | ||
import org.opensearch.dataprepper.model.sink.OutputCodecContext; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* Converts an Event into an Avro record. | ||
* <p> | ||
* It might be a good idea to consolidate similar logic for input. | ||
*/ | ||
public class AvroEventConverter { | ||
private final SchemaChooser schemaChooser; | ||
|
||
public AvroEventConverter() { | ||
this(new SchemaChooser()); | ||
} | ||
|
||
AvroEventConverter(final SchemaChooser schemaChooser) { | ||
this.schemaChooser = schemaChooser; | ||
} | ||
|
||
public GenericRecord convertEventDataToAvro(final Schema schema, | ||
final Map<String, Object> eventData, | ||
OutputCodecContext codecContext) { | ||
final GenericRecord avroRecord = new GenericData.Record(schema); | ||
for (final String key : eventData.keySet()) { | ||
if (codecContext != null && codecContext.shouldNotIncludeKey(key)) { | ||
continue; | ||
} | ||
final Schema.Field field = schema.getField(key); | ||
if (field == null) { | ||
throw new RuntimeException("The event has a key ('" + key + "') which is not included in the schema."); | ||
} | ||
final Object value = schemaMapper(field, eventData.get(key), codecContext); | ||
avroRecord.put(key, value); | ||
} | ||
return avroRecord; | ||
} | ||
|
||
private Object schemaMapper(final Schema.Field field, final Object rawValue, OutputCodecContext codecContext) { | ||
Schema providedSchema = schemaChooser.chooseSchema(field.schema()); | ||
|
||
if (providedSchema.getType() == Schema.Type.RECORD && rawValue instanceof Map) { | ||
return convertEventDataToAvro(providedSchema, (Map<String, Object>) rawValue, codecContext); | ||
} else if (providedSchema.getType() == Schema.Type.ARRAY && rawValue instanceof List) { | ||
GenericData.Array<String> avroArray = | ||
new GenericData.Array<>(((List<String>) rawValue).size(), providedSchema); | ||
for (String element : ((List<String>) rawValue)) { | ||
avroArray.add(element); | ||
} | ||
return avroArray; | ||
} | ||
return rawValue; | ||
} | ||
public interface AvroEventConverter { | ||
/** | ||
* Converts event data into an Avro record. | ||
* | ||
* @param schema The defined Avro schema | ||
* @param eventData The event data; may include tags | ||
* @param codecContext The output codec context which may define values included/excluded. | ||
* @return The generated Avro {@link GenericRecord}. | ||
*/ | ||
GenericRecord convertEventDataToAvro(final Schema schema, | ||
final Map<String, Object> eventData, | ||
final OutputCodecContext codecContext); | ||
} |
39 changes: 39 additions & 0 deletions
39
...-codecs/src/main/java/org/opensearch/dataprepper/avro/EventDefinedAvroEventConverter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package org.opensearch.dataprepper.avro; | ||
|
||
import org.apache.avro.Schema; | ||
import org.opensearch.dataprepper.model.sink.OutputCodecContext; | ||
|
||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Converts an Event into an Avro record. | ||
* <p> | ||
* This implementation utilizes the Event data first to populate the Avro record. Thus, | ||
* it will fail if the Event has any fields not in the schema. | ||
*/ | ||
public class EventDefinedAvroEventConverter extends AbstractAvroEventConverterTemplate { | ||
public EventDefinedAvroEventConverter() { | ||
this(new SchemaChooser()); | ||
} | ||
|
||
EventDefinedAvroEventConverter(final SchemaChooser schemaChooser) { | ||
super(schemaChooser); | ||
} | ||
|
||
@Override | ||
Iterable<String> getKeyNames(Schema schema, Map<String, Object> eventData, OutputCodecContext codecContext, boolean rootOfData) { | ||
Set<String> keySet = eventData.keySet(); | ||
|
||
if(codecContext == null || !rootOfData) { | ||
return keySet; | ||
} | ||
else { | ||
return keySet.stream() | ||
.filter(Predicate.not(codecContext::shouldNotIncludeKey)) | ||
.collect(Collectors.toList()); | ||
} | ||
} | ||
} |
31 changes: 31 additions & 0 deletions
31
...codecs/src/main/java/org/opensearch/dataprepper/avro/SchemaDefinedAvroEventConverter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package org.opensearch.dataprepper.avro; | ||
|
||
import org.apache.avro.Schema; | ||
import org.opensearch.dataprepper.model.sink.OutputCodecContext; | ||
|
||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Converts an Event into an Avro record. | ||
* <p> | ||
* This implementation relies on the defined schema. Thus, any fields in the Event which | ||
* are not in the schema will be ignored. | ||
*/ | ||
public class SchemaDefinedAvroEventConverter extends AbstractAvroEventConverterTemplate { | ||
public SchemaDefinedAvroEventConverter() { | ||
this(new SchemaChooser()); | ||
} | ||
|
||
SchemaDefinedAvroEventConverter(final SchemaChooser schemaChooser) { | ||
super(schemaChooser); | ||
} | ||
|
||
@Override | ||
Iterable<String> getKeyNames(Schema schema, Map<String, Object> eventData, OutputCodecContext codecContext, boolean rootOfData) { | ||
return schema.getFields() | ||
.stream() | ||
.map(Schema.Field::name) | ||
.collect(Collectors.toList()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
110 changes: 0 additions & 110 deletions
110
...ins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroEventConverterTest.java
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.