Skip to content

Commit

Permalink
AVRO-3845: avro-tools: Add log entry when an external schema is used (#…
Browse files Browse the repository at this point in the history
…2121)

* Add log entry when an external schema is used.
* Cover both reader and writer.

---------
Co-authored-by: Ruslan Altynnikov
  • Loading branch information
rulle-io authored Sep 13, 2023
1 parent fa3bb1e commit da524a4
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.avro.tool;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.ArrayList;
Expand All @@ -35,9 +36,12 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Reads a data file and dumps to JSON */
public class DataFileReadTool implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(DataFileReadTool.class);
private static final long DEFAULT_HEAD_COUNT = 10;

@Override
Expand All @@ -62,18 +66,13 @@ public int run(InputStream stdin, PrintStream out, PrintStream err, List<String>
.ofType(String.class);

OptionSet optionSet = optionParser.parse(args.toArray(new String[0]));
Boolean pretty = optionSet.has(prettyOption);
boolean pretty = optionSet.has(prettyOption);
List<String> nargs = new ArrayList<>((List<String>) optionSet.nonOptionArguments());

String readerSchemaStr = readerSchemaOption.value(optionSet);
String readerSchemaFile = readerSchemaFileOption.value(optionSet);

Schema readerSchema = null;
if (readerSchemaFile != null) {
readerSchema = Util.parseSchemaFromFS(readerSchemaFile);
} else if (readerSchemaStr != null) {
readerSchema = new Schema.Parser().parse(readerSchemaStr);
}
Schema readerSchema = getSchema(readerSchemaStr, readerSchemaFile);

long headCount = getHeadCount(optionSet, headOption, nargs);

Expand All @@ -92,7 +91,7 @@ public int run(InputStream stdin, PrintStream out, PrintStream err, List<String>
}
try (DataFileStream<Object> streamReader = new DataFileStream<>(inStream, reader)) {
Schema schema = readerSchema != null ? readerSchema : streamReader.getSchema();
DatumWriter writer = new GenericDatumWriter<>(schema);
DatumWriter<Object> writer = new GenericDatumWriter<>(schema);
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, out, pretty);
for (long recordCount = 0; streamReader.hasNext() && recordCount < headCount; recordCount++) {
Object datum = streamReader.next();
Expand All @@ -105,6 +104,18 @@ public int run(InputStream stdin, PrintStream out, PrintStream err, List<String>
return 0;
}

static Schema getSchema(String schemaStr, String schemaFile) throws IOException {
Schema readerSchema = null;
if (schemaFile != null) {
LOG.info("Reading schema from file '{}'", schemaFile);
readerSchema = Util.parseSchemaFromFS(schemaFile);
} else if (schemaStr != null) {
LOG.info("Reading schema from string '{}'", schemaStr);
readerSchema = new Schema.Parser().parse(schemaStr);
}
return readerSchema;
}

private static long getHeadCount(OptionSet optionSet, OptionSpec<String> headOption, List<String> nargs) {
long headCount = Long.MAX_VALUE;
if (optionSet.has(headOption)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public int run(InputStream stdin, PrintStream out, PrintStream err, List<String>
p.printHelpOn(err);
return 1;
}
Schema schema = (schemafile != null) ? Util.parseSchemaFromFS(schemafile) : new Schema.Parser().parse(schemastr);
Schema schema = DataFileReadTool.getSchema(schemastr, schemafile);

DatumReader<Object> reader = new GenericDatumReader<>(schema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void repairAfterCorruptRecord() throws Exception {
}

private void checkFileContains(File repairedFile, String... lines) throws IOException {
DataFileReader r = new DataFileReader<>(repairedFile, new GenericDatumReader<>(SCHEMA));
DataFileReader<Object> r = new DataFileReader<>(repairedFile, new GenericDatumReader<>(SCHEMA));
for (String line : lines) {
assertEquals(line, r.next().toString());
}
Expand Down

0 comments on commit da524a4

Please sign in to comment.