Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3845: avro-tools: Add log entry when an external schema is used #2121

Merged
merged 3 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.avro.tool;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.ArrayList;
Expand All @@ -35,9 +36,12 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Reads a data file and dumps to JSON */
public class DataFileReadTool implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(DataFileReadTool.class);
private static final long DEFAULT_HEAD_COUNT = 10;

@Override
Expand All @@ -62,18 +66,13 @@ public int run(InputStream stdin, PrintStream out, PrintStream err, List<String>
.ofType(String.class);

OptionSet optionSet = optionParser.parse(args.toArray(new String[0]));
Boolean pretty = optionSet.has(prettyOption);
boolean pretty = optionSet.has(prettyOption);
List<String> nargs = new ArrayList<>((List<String>) optionSet.nonOptionArguments());

String readerSchemaStr = readerSchemaOption.value(optionSet);
String readerSchemaFile = readerSchemaFileOption.value(optionSet);

Schema readerSchema = null;
if (readerSchemaFile != null) {
readerSchema = Util.parseSchemaFromFS(readerSchemaFile);
} else if (readerSchemaStr != null) {
readerSchema = new Schema.Parser().parse(readerSchemaStr);
}
Schema readerSchema = getSchema(readerSchemaStr, readerSchemaFile);

long headCount = getHeadCount(optionSet, headOption, nargs);

Expand All @@ -92,7 +91,7 @@ public int run(InputStream stdin, PrintStream out, PrintStream err, List<String>
}
try (DataFileStream<Object> streamReader = new DataFileStream<>(inStream, reader)) {
Schema schema = readerSchema != null ? readerSchema : streamReader.getSchema();
DatumWriter writer = new GenericDatumWriter<>(schema);
DatumWriter<Object> writer = new GenericDatumWriter<>(schema);
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, out, pretty);
for (long recordCount = 0; streamReader.hasNext() && recordCount < headCount; recordCount++) {
Object datum = streamReader.next();
Expand All @@ -105,6 +104,18 @@ public int run(InputStream stdin, PrintStream out, PrintStream err, List<String>
return 0;
}

static Schema getSchema(String schemaStr, String schemaFile) throws IOException {
Schema readerSchema = null;
if (schemaFile != null) {
LOG.info("Reading schema from file '{}'", schemaFile);
readerSchema = Util.parseSchemaFromFS(schemaFile);
} else if (schemaStr != null) {
LOG.info("Reading schema from string '{}'", schemaStr);
readerSchema = new Schema.Parser().parse(schemaStr);
}
return readerSchema;
}

private static long getHeadCount(OptionSet optionSet, OptionSpec<String> headOption, List<String> nargs) {
long headCount = Long.MAX_VALUE;
if (optionSet.has(headOption)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public int run(InputStream stdin, PrintStream out, PrintStream err, List<String>
p.printHelpOn(err);
return 1;
}
Schema schema = (schemafile != null) ? Util.parseSchemaFromFS(schemafile) : new Schema.Parser().parse(schemastr);
Schema schema = DataFileReadTool.getSchema(schemastr, schemafile);

DatumReader<Object> reader = new GenericDatumReader<>(schema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void repairAfterCorruptRecord() throws Exception {
}

private void checkFileContains(File repairedFile, String... lines) throws IOException {
DataFileReader r = new DataFileReader<>(repairedFile, new GenericDatumReader<>(SCHEMA));
DataFileReader<Object> r = new DataFileReader<>(repairedFile, new GenericDatumReader<>(SCHEMA));
for (String line : lines) {
assertEquals(line, r.next().toString());
}
Expand Down