Skip to content

Commit

Permalink
AVRO-4073: Create Convenience toBytes Method for Datum Writer
Browse files Browse the repository at this point in the history
  • Loading branch information
belugabehr committed Oct 4, 2024
1 parent 8040078 commit 99bd94d
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 10 deletions.
19 changes: 13 additions & 6 deletions lang/java/avro/src/main/java/org/apache/avro/data/Json.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
*/
package org.apache.avro.data;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;

import org.apache.avro.io.*;
import org.apache.avro.path.TracingAvroTypeException;
import org.apache.avro.path.TracingClassCastException;
import org.apache.avro.path.TracingNullPointException;
import org.apache.avro.util.internal.JacksonUtils;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -36,12 +41,6 @@

import org.apache.avro.Schema;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.ResolvingDecoder;

/** Utilities for reading and writing arbitrary Json data in Avro format. */
public class Json {
Expand Down Expand Up @@ -79,6 +78,14 @@ public void setSchema(Schema schema) {
public void write(Object datum, Encoder out) throws IOException {
Json.writeObject(datum, out);
}

@Override
public byte[] toByteArray(Object datum) throws IOException {
try (ByteArrayOutputStream out = new ByteArrayOutputStream(128)) {
write(datum, EncoderFactory.get().directBinaryEncoder(out, null));
return out.toByteArray();
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.avro.generic;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ConcurrentModificationException;
Expand All @@ -32,6 +33,7 @@
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.path.TracingAvroTypeException;
import org.apache.avro.UnresolvedUnionException;
import org.apache.avro.io.DatumWriter;
Expand Down Expand Up @@ -85,6 +87,16 @@ public void write(D datum, Encoder out) throws IOException {
}
}

@Override
public byte[] toByteArray(D datum) throws IOException {
try (ByteArrayOutputStream out = new ByteArrayOutputStream(128)) {
write(root, datum, EncoderFactory.get().directBinaryEncoder(out, null));
return out.toByteArray();
} catch (TracingNullPointException | TracingClassCastException | TracingAvroTypeException e) {
throw e.summarize(root);
}
}

/** Called to write data. */
protected void write(Schema schema, Object datum, Encoder out) throws IOException {
LogicalType logicalType = schema.getLogicalType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,11 @@ public interface DatumWriter<D> {
* the schema from the datum to the output.
*/
void write(D datum, Encoder out) throws IOException;

/**
* Convenience method to Write a datum to a byte array. Traverse the schema,
* depth first, writing each leaf value in the schema from the datum to the byte
* array.
*/
byte[] toByteArray(D datum) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

/** Utilities for generated Java classes and interfaces. */
Expand Down Expand Up @@ -257,7 +255,9 @@ protected Schema getEnumSchema(Object datum) {
return (datum instanceof Enum) ? getSchema(datum.getClass()) : super.getEnumSchema(datum);
}

private final ConcurrentMap<String, Class> classCache = new ConcurrentHashMap<>();
// private final ConcurrentMap<String, Class> classCache = new
// ConcurrentHashMap<>();
private final ThreadLocal<Map<String, Class<?>>> classCache = ThreadLocal.withInitial(HashMap::new);

private static final Class NO_CLASS = new Object() {
}.getClass();
Expand Down Expand Up @@ -363,7 +363,7 @@ public Class getClass(Schema schema) {
String name = schema.getFullName();
if (name == null)
return null;
Class<?> c = classCache.computeIfAbsent(name, n -> {
Class<?> c = classCache.get().computeIfAbsent(name, n -> {
try {
return ClassUtils.forName(getClassLoader(), getClassName(schema));
} catch (ClassNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,20 @@ void nonStringable() throws Exception {
}
}

@Test
void testToByteArray() throws Exception {
final Schema string = Schema.create(Type.STRING);
final DatumWriter<String> writer = new SpecificDatumWriter<>(string);

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final Encoder encoder = EncoderFactory.get().directBinaryEncoder(baos, null);
writer.write("test", encoder);

final byte[] bytes = writer.toByteArray("test");
assertArrayEquals(baos.toByteArray(), bytes);
}
}

@Test
void classNameContainingReservedWords() {
final Schema schema = Schema.createRecord("AnyName", null, "db.public.table", false);
Expand Down

0 comments on commit 99bd94d

Please sign in to comment.