- * Implementations are located using a {@link java.util.ServiceLoader}. See that
- * class for details.
- *
- *
- *
- * You can expect that schemas being read are invalid, so you are encouraged to
- * return {@code null} upon parsing failure where the input clearly doesn't make
- * sense (e.g., reading "/**" when expecting JSON). If the input is likely in
- * the correct format, but invalid, throw a {@link SchemaParseException}
- * instead.
- *
- *
*
- * Note that throwing anything other than a {@code SchemaParseException} will
- * abort the parsing process, so reserve that for rethrowing exceptions.
+ * Implementations are located using a {@link java.util.ServiceLoader} and must
+ * therefore be threadsafe. See the {@code ServiceLoader} class for details on
+ * loading your implementation.
*
*
* @see java.util.ServiceLoader
*/
public interface FormattedSchemaParser {
/**
- * Parse a schema from a text based source. Can use the base location of the
- * schema (e.g., the directory where the schema file lives) if available.
- *
*
- * Implementations should add all named schemas they parse to the collection.
+ * Parse schema definitions from a text based source.
*
*
- * @param types a mutable collection of known types; parsed named
- * schemata will be added
+ *
Notes for implementers:
+ *
+ *
+ *
Schema definitions are expected not to be in the format the parser
+ * expects. So when the input clearly doesn't make sense (e.g., reading "/**"
+ * when expecting JSON), it is a good idea not to do anything (especially
+ * calling methods on the @code ParseContext}).
+ *
The parameter {@code parseContext} is not thread-safe.
+ *
When parsing, all parsed schema definitions should be added to the
+ * provided {@link ParseContext}.
+ *
Optionally, you may return a "main" schema. Some schema definitions have
+ * one, for example the schema defined by the root of the JSON document in a
+ * standard schema
+ * definition. If unsure, return {@code null}.
+ *
If parsing fails, throw a {@link SchemaParseException}. This will let the
+ * parsing process recover and continue.
+ *
Throwing anything other than a {@code SchemaParseException} will abort
+ * the parsing process, so reserve that for rethrowing exceptions.
+ *
+ *
+ * @param parseContext the current parse context: all parsed schemata should
+ * be added here to resolve names with; contains all
+ * previously known types
* @param baseUri the base location of the schema, or {@code null} if
* not known
- * @param formattedSchema the schema as text
- * @return the parsed schema, or {@code null} if the format is not supported
+ * @param formattedSchema the text of the schema definition(s) to parse
+ * @return the main schema, if any
* @throws IOException when the schema cannot be read
* @throws SchemaParseException when the schema cannot be parsed
*/
- Schema parse(Collection types, URI baseUri, CharSequence formattedSchema)
+ Schema parse(ParseContext parseContext, URI baseUri, CharSequence formattedSchema)
throws IOException, SchemaParseException;
}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java b/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java
index 9a1da447d15..c7d91878627 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java
@@ -19,8 +19,6 @@
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
/**
* Schema parser for JSON formatted schemata. This initial implementation simply
@@ -59,32 +57,26 @@ public static Schema parseInternal(String... fragments) {
for (String fragment : fragments) {
buffer.append(fragment);
}
- return new JsonSchemaParser().parse(new ArrayList<>(), buffer, true);
+ return new JsonSchemaParser().parse(new ParseContext(NameValidator.NO_VALIDATION), buffer, null);
}
@Override
- public Schema parse(Collection schemas, URI baseUri, CharSequence formattedSchema)
+ public Schema parse(ParseContext parseContext, URI baseUri, CharSequence formattedSchema)
throws IOException, SchemaParseException {
- return parse(schemas, formattedSchema, false);
+ return parse(parseContext, formattedSchema, parseContext.nameValidator);
}
- private Schema parse(Collection schemas, CharSequence formattedSchema, boolean skipValidation)
+ private Schema parse(ParseContext parseContext, CharSequence formattedSchema, NameValidator nameValidator)
throws SchemaParseException {
- // TODO: refactor JSON parsing out of the Schema class
- Schema.Parser parser;
- if (skipValidation) {
- parser = new Schema.Parser(Schema.NameValidator.NO_VALIDATION);
+ Schema.Parser parser = new Schema.Parser(nameValidator);
+ if (nameValidator == NameValidator.NO_VALIDATION) {
parser.setValidateDefaults(false);
} else {
- parser = new Schema.Parser();
- }
- if (schemas != null) {
- parser.addTypes(schemas);
+ parser = new Schema.Parser(nameValidator);
}
+ parser.addTypes(parseContext.typesByName().values());
Schema schema = parser.parse(formattedSchema.toString());
- if (schemas != null) {
- schemas.addAll(parser.getTypes().values());
- }
+ parser.getTypes().values().forEach(parseContext::put);
return schema;
}
}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/NameValidator.java b/lang/java/avro/src/main/java/org/apache/avro/NameValidator.java
new file mode 100644
index 00000000000..f1262d922cf
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/NameValidator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+public interface NameValidator {
+
+ class Result {
+ private final String errors;
+
+ public Result(final String errors) {
+ this.errors = errors;
+ }
+
+ public boolean isOK() {
+ return this == NameValidator.OK;
+ }
+
+ public String getErrors() {
+ return errors;
+ }
+ }
+
+ Result OK = new Result(null);
+
+ default Result validate(String name) {
+ return OK;
+ }
+
+ NameValidator NO_VALIDATION = new NameValidator() {
+ };
+
+ NameValidator UTF_VALIDATOR = new NameValidator() {
+ @Override
+ public Result validate(final String name) {
+ if (name == null) {
+ return new Result("Null name");
+ }
+ int length = name.length();
+ if (length == 0) {
+ return new Result("Empty name");
+ }
+ char first = name.charAt(0);
+ if (!(Character.isLetter(first) || first == '_')) {
+ return new Result("Illegal initial character: " + name);
+ }
+ for (int i = 1; i < length; i++) {
+ char c = name.charAt(i);
+ if (!(Character.isLetterOrDigit(c) || c == '_')) {
+ return new Result("Illegal character in: " + name);
+ }
+ }
+ return OK;
+ }
+ };
+
+ NameValidator STRICT_VALIDATOR = new NameValidator() {
+ @Override
+ public Result validate(final String name) {
+ if (name == null) {
+ return new Result("Null name");
+ }
+ int length = name.length();
+ if (length == 0) {
+ return new Result("Empty name");
+ }
+ char first = name.charAt(0);
+ if (!(isLetter(first) || first == '_')) {
+ return new Result("Illegal initial character: " + name);
+ }
+ for (int i = 1; i < length; i++) {
+ char c = name.charAt(i);
+ if (!(isLetter(c) || isDigit(c) || c == '_')) {
+ return new Result("Illegal character in: " + name);
+ }
+ }
+ return OK;
+ }
+
+ private boolean isLetter(char c) {
+ return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z');
+ }
+
+ private boolean isDigit(char c) {
+ return c >= '0' && c <= '9';
+ }
+
+ };
+
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/ParseContext.java b/lang/java/avro/src/main/java/org/apache/avro/ParseContext.java
new file mode 100644
index 00000000000..02f4129bed2
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/ParseContext.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import org.apache.avro.util.SchemaResolver;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class to define a name context, useful to reference schemata with. This
+ * allows for the following:
+ *
+ *
+ *
Provide a default namespace for nested contexts, as found for example in
+ * JSON based schema definitions.
+ *
Find schemata by name, including primitives.
+ *
Collect new named schemata.
+ *
+ *
+ *
+ * Note: this class has no use for most Avro users, but is a key component when
+ * implementing a schema parser.
+ *
+ *
+ * @see JSON based
+ * schema definition
+ **/
+public class ParseContext {
+ private static final Map PRIMITIVES = new HashMap<>();
+
+ static {
+ PRIMITIVES.put("string", Schema.Type.STRING);
+ PRIMITIVES.put("bytes", Schema.Type.BYTES);
+ PRIMITIVES.put("int", Schema.Type.INT);
+ PRIMITIVES.put("long", Schema.Type.LONG);
+ PRIMITIVES.put("float", Schema.Type.FLOAT);
+ PRIMITIVES.put("double", Schema.Type.DOUBLE);
+ PRIMITIVES.put("boolean", Schema.Type.BOOLEAN);
+ PRIMITIVES.put("null", Schema.Type.NULL);
+ }
+
+ private static final Set NAMED_SCHEMA_TYPES = EnumSet.of(Schema.Type.RECORD, Schema.Type.ENUM,
+ Schema.Type.FIXED);
+ private final Map oldSchemas;
+ private final Map newSchemas;
+ // Visible for use in JsonSchemaParser
+ final NameValidator nameValidator;
+ private final String namespace;
+
+ /**
+ * Create a {@code ParseContext} for the default/{@code null} namespace, using
+ * default name validation for new schemata.
+ */
+ public ParseContext() {
+ this(NameValidator.UTF_VALIDATOR, null);
+ }
+
+ /**
+ * Create a {@code ParseContext} for the specified namespace, using default name
+ * validation for new schemata.
+ */
+ public ParseContext(String namespace) {
+ this(NameValidator.UTF_VALIDATOR, namespace);
+ }
+
+ /**
+ * Create a {@code ParseContext} for the default/{@code null} namespace, using
+ * the specified name validation for new schemata.
+ */
+ public ParseContext(NameValidator nameValidator) {
+ this(nameValidator, null);
+ }
+
+ /**
+ * Create a {@code ParseContext} for the specified namespace, using the
+ * specified name validation for new schemata.
+ */
+ public ParseContext(NameValidator nameValidator, String namespace) {
+ this(nameValidator, new LinkedHashMap<>(), new LinkedHashMap<>(), namespace);
+ }
+
+ private ParseContext(NameValidator nameValidator, Map oldSchemas, Map newSchemas,
+ String namespace) {
+ this.nameValidator = nameValidator;
+ this.oldSchemas = oldSchemas;
+ this.newSchemas = newSchemas;
+ this.namespace = notEmpty(namespace) ? namespace : null;
+ }
+
+ /**
+ * Create a derived context using a different fallback namespace.
+ *
+ * @param namespace the fallback namespace to resolve names with
+ * @return a new context
+ */
+ public ParseContext namespace(String namespace) {
+ return new ParseContext(nameValidator, oldSchemas, newSchemas, namespace);
+ }
+
+ /**
+ * Return the fallback namespace.
+ *
+ * @return the namespace
+ */
+ public String namespace() {
+ return namespace;
+ }
+
+ /**
+ * Tell whether this context contains the given schema.
+ *
+ * @param schema a schema
+ * @return {@code true} if the context contains the schema, {@code false}
+ * otherwise
+ */
+ @Deprecated
+ public boolean contains(Schema schema) {
+ String fullName = schema.getFullName();
+ return schema.equals(oldSchemas.get(fullName)) || schema.equals(newSchemas.get(fullName));
+ }
+
+ /**
+ * Tell whether this context contains a schema with the given name.
+ *
+ * @param name a schema name
+ * @return {@code true} if the context contains a schema with this name,
+ * {@code false} otherwise
+ */
+ public boolean contains(String name) {
+ return PRIMITIVES.containsKey(name) || oldSchemas.containsKey(name) || newSchemas.containsKey(name);
+ }
+
+ /**
+ * Resolve a schema by name. That is:
+ *
+ *
+ *
If {@code name} is a primitive name, return a (new) schema for it
+ *
If {@code name} contains a dot, resolve the schema by full name only
+ *
Otherwise: resolve the schema in the current and in the null namespace
+ * (the former takes precedence)
+ *
+ *
+ * Resolving means that the schema is returned if known, and otherwise an
+ * unresolved schema (a reference) is returned.
+ *
+ * @param name the schema name to resolve
+ * @return the schema
+ * @throws SchemaParseException when the schema does not exist
+ */
+ public Schema resolve(String name) {
+ Schema.Type type = PRIMITIVES.get(name);
+ if (type != null) {
+ return Schema.create(type);
+ }
+
+ String fullName = resolveName(name, namespace);
+ Schema schema = getSchema(fullName);
+ if (schema == null) {
+ schema = getSchema(name);
+ }
+
+ return schema != null ? schema : SchemaResolver.unresolvedSchema(fullName);
+ }
+
+ private Schema getSchema(String fullName) {
+ Schema schema = oldSchemas.get(fullName);
+ if (schema == null) {
+ schema = newSchemas.get(fullName);
+ }
+ return schema;
+ }
+
+ // Visible for testing
+ String resolveName(String name, String space) {
+ int lastDot = name.lastIndexOf('.');
+ if (lastDot < 0) { // short name
+ if (!notEmpty(space)) {
+ space = namespace;
+ }
+ if (notEmpty(space)) {
+ return space + "." + name;
+ }
+ }
+ return name;
+ }
+
+ /**
+ * Return the simplest name that references the same schema in the current
+ * namespace. Returns the name without any namespace if it is not a primitive,
+ * and the namespace is the current namespace.
+ *
+ * @param fullName the full schema name
+ * @return the simplest name within the current namespace
+ */
+ public String simpleName(String fullName) {
+ int lastDot = fullName.lastIndexOf('.');
+ if (lastDot >= 0) {
+ String name = fullName.substring(lastDot + 1);
+ String space = fullName.substring(0, lastDot);
+ if (!PRIMITIVES.containsKey(name) && space.equals(namespace)) {
+ // The name is a full name in the current namespace, and cannot be
+ // mistaken for a primitive type.
+ return name;
+ }
+ }
+ // The special case of the previous comment does not apply.
+ return fullName;
+ }
+
+ private boolean notEmpty(String str) {
+ return str != null && !str.isEmpty();
+ }
+
+ /**
+ * Put the schema into this context. This is an idempotent operation: it only
+ * fails if this context already has a different schema with the same name.
+ *
+ *
+ * Note that although this method works for all types except for arrays, maps
+ * and unions, all primitive types have already been defined upon construction.
+ * This means you cannot redefine a 'long' with a logical timestamp type.
+ *
+ *
+ * @param schema the schema to put into the context
+ */
+ public void put(Schema schema) {
+ if (!(NAMED_SCHEMA_TYPES.contains(schema.getType()))) {
+ throw new AvroTypeException("You can only put a named schema into the context");
+ }
+
+ String fullName = requireValidFullName(schema.getFullName());
+
+ Schema alreadyKnownSchema = oldSchemas.get(fullName);
+ if (alreadyKnownSchema != null) {
+ if (!schema.equals(alreadyKnownSchema)) {
+ throw new SchemaParseException("Can't redefine: " + fullName);
+ }
+ } else {
+ Schema previouslyAddedSchema = newSchemas.putIfAbsent(fullName, schema);
+ if (previouslyAddedSchema != null && !previouslyAddedSchema.equals(schema)) {
+ throw new SchemaParseException("Can't redefine: " + fullName);
+ }
+ }
+ }
+
+ private String requireValidFullName(String fullName) {
+ String[] names = fullName.split("\\.");
+ for (int i = 0; i < names.length - 1; i++) {
+ validateName(names[i], "Namespace part");
+ }
+ validateName(names[names.length - 1], "Name");
+ return fullName;
+ }
+
+ private void validateName(String name, String what) {
+ NameValidator.Result result = nameValidator.validate(name);
+ if (!result.isOK()) {
+ throw new SchemaParseException(what + " \"" + name + "\" is invalid: " + result.getErrors());
+ }
+ }
+
+ public boolean hasNewSchemas() {
+ return !newSchemas.isEmpty();
+ }
+
+ public void commit() {
+ oldSchemas.putAll(newSchemas);
+ newSchemas.clear();
+ }
+
+ public void rollback() {
+ newSchemas.clear();
+ }
+
+ /**
+ * Return all known types by their fullname.
+ *
+ * @return a map of all types by their name
+ */
+ public Map typesByName() {
+ LinkedHashMap result = new LinkedHashMap<>();
+ result.putAll(oldSchemas);
+ result.putAll(newSchemas);
+ return result;
+ }
+
+ public Protocol resolveSchemata(Protocol protocol) {
+ protocol.getTypes().forEach(this::put);
+ return SchemaResolver.resolve(this, protocol);
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/Schema.java b/lang/java/avro/src/main/java/org/apache/avro/Schema.java
index 38a6e4a9e42..f312ecfb6f2 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/Schema.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/Schema.java
@@ -25,6 +25,12 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.avro.util.internal.Accessor;
+import org.apache.avro.util.internal.Accessor.FieldAccessor;
+import org.apache.avro.util.internal.JacksonUtils;
+import org.apache.avro.util.internal.ThreadLocalWithInitial;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -51,13 +57,6 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import org.apache.avro.util.internal.Accessor;
-import org.apache.avro.util.internal.Accessor.FieldAccessor;
-import org.apache.avro.util.internal.JacksonUtils;
-import org.apache.avro.util.internal.ThreadLocalWithInitial;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import static org.apache.avro.LogicalType.LOGICAL_TYPE_PROP;
/**
@@ -1490,7 +1489,7 @@ public NullSchema() {
*/
public static class Parser {
private Names names = new Names();
- private final Schema.NameValidator validate;
+ private final NameValidator validate;
private boolean validateDefaults = true;
public Parser() {
@@ -1760,18 +1759,18 @@ public Schema put(Name name, Schema schema) {
}
}
- private static ThreadLocal validateNames = ThreadLocalWithInitial
+ private static ThreadLocal validateNames = ThreadLocalWithInitial
.of(() -> NameValidator.UTF_VALIDATOR);
private static String validateName(String name) {
NameValidator.Result result = validateNames.get().validate(name);
if (!result.isOK()) {
- throw new SchemaParseException(result.errors);
+ throw new SchemaParseException(result.getErrors());
}
return name;
}
- public static void setNameValidator(final Schema.NameValidator validator) {
+ public static void setNameValidator(final NameValidator validator) {
Schema.validateNames.set(validator);
}
@@ -2311,84 +2310,6 @@ private static String getFieldAlias(Name record, String field, Map= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z');
- }
-
- private boolean isDigit(char c) {
- return c >= '0' && c <= '9';
- }
-
- };
-
- }
-
/**
* No change is permitted on LockableArrayList once lock() has been called on
* it.
diff --git a/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java b/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java
index c100f724b8e..dfb3c01f353 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java
@@ -29,10 +29,8 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.ServiceLoader;
-import java.util.Set;
/**
* Avro schema parser for text-based formats like JSON, IDL, etc.
@@ -55,7 +53,7 @@
* @see UtfTextUtils
*/
public class SchemaParser {
- private final Set knownSchemata;
+ private final ParseContext parseContext;
private final Collection formattedSchemaParsers;
/**
@@ -63,7 +61,7 @@ public class SchemaParser {
* empty.
*/
public SchemaParser() {
- this.knownSchemata = new LinkedHashSet<>();
+ this.parseContext = new ParseContext();
this.formattedSchemaParsers = new ArrayList<>();
for (FormattedSchemaParser formattedSchemaParser : ServiceLoader.load(FormattedSchemaParser.class)) {
formattedSchemaParsers.add(formattedSchemaParser);
@@ -226,14 +224,14 @@ private Schema parse(URI baseUri, CharSequence formattedSchema) throws IOExcepti
List parseExceptions = new ArrayList<>();
for (FormattedSchemaParser formattedSchemaParser : formattedSchemaParsers) {
try {
- // Ensure we're only changing (adding to) the known types when a parser succeeds
- Set schemaSet = new LinkedHashSet<>(knownSchemata);
- Schema schema = formattedSchemaParser.parse(schemaSet, baseUri, formattedSchema);
- if (schema != null) {
- knownSchemata.addAll(schemaSet);
+ Schema schema = formattedSchemaParser.parse(parseContext, baseUri, formattedSchema);
+ if (parseContext.hasNewSchemas()) {
+ // Parsing succeeded: return the result.
+ parseContext.commit();
return schema;
}
} catch (SchemaParseException e) {
+ parseContext.rollback();
parseExceptions.add(e);
}
}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
index 150d2ace9ba..e9b5ed38852 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
@@ -34,6 +34,7 @@
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.InvalidAvroMagicException;
+import org.apache.avro.NameValidator;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
@@ -139,7 +140,7 @@ void initialize(InputStream in, byte[] magic) throws IOException {
// finalize the header
header.metaKeyList = Collections.unmodifiableList(header.metaKeyList);
- header.schema = new Schema.Parser(Schema.NameValidator.NO_VALIDATION).setValidateDefaults(false)
+ header.schema = new Schema.Parser(NameValidator.NO_VALIDATION).setValidateDefaults(false)
.parse(getMetaString(DataFileConstants.SCHEMA));
this.codec = resolveCodec();
reader.setSchema(header.schema);
diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/SchemaResolver.java b/lang/java/avro/src/main/java/org/apache/avro/util/SchemaResolver.java
new file mode 100644
index 00000000000..c3a25a5e577
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/SchemaResolver.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.util;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.ParseContext;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.avro.Schema.Type.ARRAY;
+import static org.apache.avro.Schema.Type.ENUM;
+import static org.apache.avro.Schema.Type.FIXED;
+import static org.apache.avro.Schema.Type.MAP;
+import static org.apache.avro.Schema.Type.RECORD;
+import static org.apache.avro.Schema.Type.UNION;
+
+/**
+ * Utility class to resolve schemas that are unavailable at the point they are
+ * referenced in the IDL.
+ */
+public final class SchemaResolver {
+
+ private SchemaResolver() {
+ }
+
+ private static final String UR_SCHEMA_ATTR = "org.apache.avro.idl.unresolved.name";
+
+ private static final String UR_SCHEMA_NAME = "UnresolvedSchema";
+
+ private static final String UR_SCHEMA_NS = "org.apache.avro.compiler";
+
+ private static final AtomicInteger COUNTER = new AtomicInteger();
+
+ /**
+ * Create a schema to represent an "unresolved" schema. (used to represent a
+ * schema whose definition does not exist, yet).
+ *
+ * @param name a schema name
+ * @return an unresolved schema for the given name
+ */
+ public static Schema unresolvedSchema(final String name) {
+ Schema schema = Schema.createRecord(UR_SCHEMA_NAME + '_' + COUNTER.getAndIncrement(), "unresolved schema",
+ UR_SCHEMA_NS, false, Collections.emptyList());
+ schema.addProp(UR_SCHEMA_ATTR, name);
+ return schema;
+ }
+
+ /**
+ * Is this an unresolved schema.
+ *
+ * @param schema a schema
+ * @return whether the schema is an unresolved schema
+ */
+ public static boolean isUnresolvedSchema(final Schema schema) {
+ return (schema.getType() == Schema.Type.RECORD && schema.getProp(UR_SCHEMA_ATTR) != null && schema.getName() != null
+ && schema.getName().startsWith(UR_SCHEMA_NAME) && UR_SCHEMA_NS.equals(schema.getNamespace()));
+ }
+
+ /**
+ * Get the unresolved schema name.
+ *
+ * @param schema an unresolved schema
+ * @return the name of the unresolved schema
+ */
+ public static String getUnresolvedSchemaName(final Schema schema) {
+ if (!isUnresolvedSchema(schema)) {
+ throw new IllegalArgumentException("Not a unresolved schema: " + schema);
+ }
+ return schema.getProp(UR_SCHEMA_ATTR);
+ }
+
+ /**
+ * Is this an unresolved schema?
+ */
+ public static boolean isFullyResolvedSchema(final Schema schema) {
+ if (isUnresolvedSchema(schema)) {
+ return false;
+ } else {
+ return Schemas.visit(schema, new IsResolvedSchemaVisitor());
+ }
+ }
+
+ /**
+ * Clone the provided schema while resolving all unreferenced schemas.
+ *
+ * @param parseContext the parse context with known names
+ * @param schema the schema to resolve
+ * @return a copy of the schema with all schemas resolved
+ */
+ public static Schema resolve(final ParseContext parseContext, Schema schema) {
+ if (schema == null) {
+ return null;
+ }
+ ResolvingVisitor visitor = new ResolvingVisitor(schema, parseContext::resolve);
+ return Schemas.visit(schema, visitor);
+ }
+
+ /**
+ * Clone all provided schemas while resolving all unreferenced schemas.
+ *
+ * @param parseContext the parse context with known names
+ * @param schemas the schemas to resolve
+ * @return a copy of all schemas with all schemas resolved
+ */
+ public static Collection resolve(final ParseContext parseContext, Collection schemas) {
+ ResolvingVisitor visitor = new ResolvingVisitor(null, parseContext::resolve);
+ return schemas.stream().map(schema -> Schemas.visit(schema, visitor.withRoot(schema))).collect(Collectors.toList());
+ }
+
+ /**
+ * Will clone the provided protocol while resolving all unreferenced schemas
+ *
+ * @param parseContext the parse context with known names
+ * @param protocol the protocol to resolve
+ * @return a copy of the protocol with all schemas resolved
+ */
+ public static Protocol resolve(ParseContext parseContext, final Protocol protocol) {
+ // Create an empty copy of the protocol
+ Protocol result = new Protocol(protocol.getName(), protocol.getDoc(), protocol.getNamespace());
+ protocol.getObjectProps().forEach(((JsonProperties) result)::addProp);
+
+ ResolvingVisitor visitor = new ResolvingVisitor(null, parseContext::resolve);
+ Function resolver = schema -> Schemas.visit(schema, visitor.withRoot(schema));
+
+ // Resolve all schemata in the protocol.
+ result.setTypes(protocol.getTypes().stream().map(resolver).collect(Collectors.toList()));
+ Map resultMessages = result.getMessages();
+ protocol.getMessages().forEach((name, oldValue) -> {
+ Protocol.Message newValue;
+ if (oldValue.isOneWay()) {
+ newValue = result.createMessage(oldValue.getName(), oldValue.getDoc(), oldValue,
+ resolver.apply(oldValue.getRequest()));
+ } else {
+ Schema request = resolver.apply(oldValue.getRequest());
+ Schema response = resolver.apply(oldValue.getResponse());
+ Schema errors = resolver.apply(oldValue.getErrors());
+ newValue = result.createMessage(oldValue.getName(), oldValue.getDoc(), oldValue, request, response, errors);
+ }
+ resultMessages.put(name, newValue);
+ });
+ return result;
+ }
+
+ /**
+ * This visitor checks if the current schema is fully resolved.
+ */
+ public static final class IsResolvedSchemaVisitor implements SchemaVisitor {
+ boolean hasUnresolvedParts;
+
+ IsResolvedSchemaVisitor() {
+ hasUnresolvedParts = false;
+ }
+
+ @Override
+ public SchemaVisitorAction visitTerminal(Schema terminal) {
+ hasUnresolvedParts = isUnresolvedSchema(terminal);
+ return hasUnresolvedParts ? SchemaVisitorAction.TERMINATE : SchemaVisitorAction.CONTINUE;
+ }
+
+ @Override
+ public SchemaVisitorAction visitNonTerminal(Schema nonTerminal) {
+ hasUnresolvedParts = isUnresolvedSchema(nonTerminal);
+ if (hasUnresolvedParts) {
+ return SchemaVisitorAction.TERMINATE;
+ }
+ if (nonTerminal.getType() == Schema.Type.RECORD && !nonTerminal.hasFields()) {
+ // We're still initializing the type...
+ return SchemaVisitorAction.SKIP_SUBTREE;
+ }
+ return SchemaVisitorAction.CONTINUE;
+ }
+
+ @Override
+ public SchemaVisitorAction afterVisitNonTerminal(Schema nonTerminal) {
+ return SchemaVisitorAction.CONTINUE;
+ }
+
+ @Override
+ public Boolean get() {
+ return !hasUnresolvedParts;
+ }
+ }
+
+ /**
+ * This visitor creates clone of the visited Schemata, minus the specified
+ * schema properties, and resolves all unresolved schemas.
+ */
+ public static final class ResolvingVisitor implements SchemaVisitor {
+ private static final Set CONTAINER_SCHEMA_TYPES = EnumSet.of(RECORD, ARRAY, MAP, UNION);
+ private static final Set NAMED_SCHEMA_TYPES = EnumSet.of(RECORD, ENUM, FIXED);
+
+ private final Function symbolTable;
+ private final Set schemaPropertiesToRemove;
+ private final IdentityHashMap replace;
+
+ private final Schema root;
+
+ public ResolvingVisitor(final Schema root, final Function symbolTable,
+ String... schemaPropertiesToRemove) {
+ this(root, symbolTable, new HashSet<>(Arrays.asList(schemaPropertiesToRemove)));
+ }
+
+ public ResolvingVisitor(final Schema root, final Function symbolTable,
+ Set schemaPropertiesToRemove) {
+ this.replace = new IdentityHashMap<>();
+ this.symbolTable = symbolTable;
+ this.schemaPropertiesToRemove = schemaPropertiesToRemove;
+
+ this.root = root;
+ }
+
+ public ResolvingVisitor withRoot(Schema root) {
+ return new ResolvingVisitor(root, symbolTable, schemaPropertiesToRemove);
+ }
+
+ @Override
+ public SchemaVisitorAction visitTerminal(final Schema terminal) {
+ Schema.Type type = terminal.getType();
+ Schema newSchema;
+ if (CONTAINER_SCHEMA_TYPES.contains(type)) {
+ if (!replace.containsKey(terminal)) {
+ throw new IllegalStateException("Schema " + terminal + " must be already processed");
+ }
+ return SchemaVisitorAction.CONTINUE;
+ } else if (type == ENUM) {
+ newSchema = Schema.createEnum(terminal.getName(), terminal.getDoc(), terminal.getNamespace(),
+ terminal.getEnumSymbols(), terminal.getEnumDefault());
+ } else if (type == FIXED) {
+ newSchema = Schema.createFixed(terminal.getName(), terminal.getDoc(), terminal.getNamespace(),
+ terminal.getFixedSize());
+ } else {
+ newSchema = Schema.create(type);
+ }
+ copyProperties(terminal, newSchema);
+ replace.put(terminal, newSchema);
+ return SchemaVisitorAction.CONTINUE;
+ }
+
+ public void copyProperties(final Schema first, final Schema second) {
+ // Logical type
+ Optional.ofNullable(first.getLogicalType()).ifPresent(logicalType -> logicalType.addToSchema(second));
+
+ // Aliases (if applicable)
+ if (NAMED_SCHEMA_TYPES.contains(first.getType())) {
+ first.getAliases().forEach(second::addAlias);
+ }
+
+ // Other properties
+ first.getObjectProps().forEach((name, value) -> {
+ if (!schemaPropertiesToRemove.contains(name)) {
+ second.addProp(name, value);
+ }
+ });
+ }
+
+ @Override
+ public SchemaVisitorAction visitNonTerminal(final Schema nt) {
+ Schema.Type type = nt.getType();
+ if (type == RECORD) {
+ if (isUnresolvedSchema(nt)) {
+ // unresolved schema will get a replacement that we already encountered,
+ // or we will attempt to resolve.
+ final String unresolvedSchemaName = getUnresolvedSchemaName(nt);
+ Schema resSchema = symbolTable.apply(unresolvedSchemaName);
+ if (resSchema == null) {
+ throw new AvroTypeException("Undefined schema: " + unresolvedSchemaName);
+ }
+ Schema replacement = replace.computeIfAbsent(resSchema, schema -> {
+ Schemas.visit(schema, this);
+ return replace.get(schema);
+ });
+ replace.put(nt, replacement);
+ } else {
+ // Create a clone without fields. Fields will be added in afterVisitNonTerminal.
+ Schema newSchema = Schema.createRecord(nt.getName(), nt.getDoc(), nt.getNamespace(), nt.isError());
+ copyProperties(nt, newSchema);
+ replace.put(nt, newSchema);
+ }
+ }
+ return SchemaVisitorAction.CONTINUE;
+ }
+
+ @Override
+ public SchemaVisitorAction afterVisitNonTerminal(final Schema nt) {
+ Schema.Type type = nt.getType();
+ Schema newSchema;
+ switch (type) {
+ case RECORD:
+ if (!isUnresolvedSchema(nt)) {
+ newSchema = replace.get(nt);
+ // Check if we've already handled the replacement schema with a
+ // reentrant call to visit(...) from within the visitor.
+ if (!newSchema.hasFields()) {
+ List fields = nt.getFields();
+ List newFields = new ArrayList<>(fields.size());
+ for (Schema.Field field : fields) {
+ newFields.add(new Schema.Field(field, replace.get(field.schema())));
+ }
+ newSchema.setFields(newFields);
+ }
+ }
+ return SchemaVisitorAction.CONTINUE;
+ case UNION:
+ List types = nt.getTypes();
+ List newTypes = new ArrayList<>(types.size());
+ for (Schema sch : types) {
+ newTypes.add(replace.get(sch));
+ }
+ newSchema = Schema.createUnion(newTypes);
+ break;
+ case ARRAY:
+ newSchema = Schema.createArray(replace.get(nt.getElementType()));
+ break;
+ case MAP:
+ newSchema = Schema.createMap(replace.get(nt.getValueType()));
+ break;
+ default:
+ throw new IllegalStateException("Illegal type " + type + ", schema " + nt);
+ }
+ copyProperties(nt, newSchema);
+ replace.put(nt, newSchema);
+ return SchemaVisitorAction.CONTINUE;
+ }
+
+ @Override
+ public Schema get() {
+ return replace.get(root);
+ }
+
+ @Override
+ public String toString() {
+ return "ResolvingVisitor{symbolTable=" + symbolTable + ", schemaPropertiesToRemove=" + schemaPropertiesToRemove
+ + ", replace=" + replace + '}';
+ }
+ }
+}
diff --git a/lang/java/idl/src/main/java/org/apache/avro/idl/SchemaVisitor.java b/lang/java/avro/src/main/java/org/apache/avro/util/SchemaVisitor.java
similarity index 77%
rename from lang/java/idl/src/main/java/org/apache/avro/idl/SchemaVisitor.java
rename to lang/java/avro/src/main/java/org/apache/avro/util/SchemaVisitor.java
index 0f9fcae5b68..1ac35baeda7 100644
--- a/lang/java/idl/src/main/java/org/apache/avro/idl/SchemaVisitor.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/SchemaVisitor.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.avro.idl;
+package org.apache.avro.util;
import org.apache.avro.Schema;
@@ -44,4 +44,26 @@ public interface SchemaVisitor {
* @return a value that will be returned by the visit method.
*/
T get();
+
+ enum SchemaVisitorAction {
+
+ /**
+ * continue visit.
+ */
+ CONTINUE,
+ /**
+ * terminate visit.
+ */
+ TERMINATE,
+ /**
+ * when returned from pre non terminal visit method the children of the non
+ * terminal are skipped. afterVisitNonTerminal for the current schema will not
+ * be invoked.
+ */
+ SKIP_SUBTREE,
+ /**
+ * Skip visiting the siblings of this schema.
+ */
+ SKIP_SIBLINGS
+ }
}
diff --git a/lang/java/idl/src/main/java/org/apache/avro/idl/Schemas.java b/lang/java/avro/src/main/java/org/apache/avro/util/Schemas.java
similarity index 89%
rename from lang/java/idl/src/main/java/org/apache/avro/idl/Schemas.java
rename to lang/java/avro/src/main/java/org/apache/avro/util/Schemas.java
index da4b949d2bc..927a0c37b43 100644
--- a/lang/java/idl/src/main/java/org/apache/avro/idl/Schemas.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/Schemas.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.avro.idl;
+package org.apache.avro.util;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
@@ -51,8 +51,7 @@ public static T visit(final Schema start, final SchemaVisitor visitor) {
while ((current = dq.poll()) != null) {
if (current instanceof Supplier) {
// We are executing a non-terminal post visit.
- @SuppressWarnings("unchecked")
- SchemaVisitorAction action = ((Supplier) current).get();
+ SchemaVisitor.SchemaVisitorAction action = ((Supplier) current).get();
switch (action) {
case CONTINUE:
break;
@@ -107,14 +106,14 @@ public static T visit(final Schema start, final SchemaVisitor visitor) {
private static boolean visitNonTerminal(final SchemaVisitor> visitor, final Schema schema, final Deque