Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3666 [Java] Use the new schema parser #2642

Merged
merged 27 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9b50389
AVRO-3666: Redo schema parsing code
opwvhk Dec 15, 2023
810657c
AVRO-3666: Resolve references after parsing
opwvhk Dec 18, 2023
9881669
AVRO-3666: Remove wrong test
opwvhk Dec 19, 2023
1243cec
AVRO-1535: Fix aliases as well
opwvhk Dec 19, 2023
30c51ab
AVRO-3666: Re-enable disabled test
opwvhk Dec 19, 2023
20fa338
AVRO-3666: Fix RAT exclusion
opwvhk Dec 19, 2023
294fdd9
AVRO-3666: Remove unused field
opwvhk Dec 19, 2023
f2dbd26
AVRO-3666: Introduce SchemaParser.ParseResult
opwvhk Jan 15, 2024
841be1b
AVRO-3666: Use SchemaParser for documentation
opwvhk Jan 19, 2024
87968dd
AVRO-3666: Refactor after review
opwvhk Jan 19, 2024
da1935a
AVRO-3666: Fix javadoc
opwvhk Jan 19, 2024
2707900
AVRO-3666: Merge branch 'main' into AVRO-3666-use-schema-parser
opwvhk Jan 19, 2024
3b05640
AVRO-3666: Fix merge bug
opwvhk Jan 19, 2024
fe35f95
AVRO-3666: Fix CodeQL warnings
opwvhk Jan 20, 2024
6223f33
AVRO-3666: Increase test coverage
opwvhk Jan 22, 2024
2cdf9e9
AVRO-3666: Fix tests
opwvhk Jan 22, 2024
838eb0d
AVRO-3666: Refactor schema parsing for readability
opwvhk Jan 23, 2024
41806c6
AVRO-3666: Merge branch 'main' into AVRO-3666-use-schema-parser
opwvhk Jan 23, 2024
081e3ed
AVRO-3666: rename method to avoid confusion
opwvhk Jan 23, 2024
7c36b1c
AVRO-3666: Merge branch 'main' into AVRO-3666-use-schema-parser
opwvhk Feb 23, 2024
675064f
AVRO-3666: Reduce PR size
opwvhk Feb 27, 2024
e1b257a
AVRO-3666: Reduce PR size more
opwvhk Feb 27, 2024
ca2ad31
AVRO-3666: Reduce PR size again
opwvhk Feb 27, 2024
1ea4881
AVRO-3666: Merge branch 'main' into AVRO-3666-use-schema-parser
opwvhk Apr 4, 2024
2dd88f6
AVRO-3666: Spotless
opwvhk Apr 4, 2024
b3f6b00
Update lang/java/avro/src/main/java/org/apache/avro/Schema.java
opwvhk Apr 4, 2024
c5a7659
AVRO-3666: Spotless
opwvhk Apr 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ Let's go over the same example as in the previous section, but without using cod
First, we use a SchemaParser to read our schema definition and create a Schema object.

```java
Schema schema = new SchemaParser().parse(new File("user.avsc"));
Schema schema = new SchemaParser().parse(new File("user.avsc")).mainSchema();
```

Using this schema, let's create some users.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public interface FormattedSchemaParser {
* the parsing process, so reserve that for rethrowing exceptions.</li>
* </ul>
*
* @param parseContext the current parse context: all parsed schemata should
* be added here to resolve names with; contains all
* previously known types
* @param parseContext the current parse context: all named schemata that are
* parsed should be added here, otherwise resolving
* schemata can fail; contains all previously known types
* @param baseUri the base location of the schema, or {@code null} if
* not known
* @param formattedSchema the text of the schema definition(s) to parse
Expand Down
28 changes: 17 additions & 11 deletions lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,32 @@ public static Schema parseInternal(String... fragments) {
for (String fragment : fragments) {
buffer.append(fragment);
}
return new JsonSchemaParser().parse(new ParseContext(NameValidator.NO_VALIDATION), buffer, null);

boolean saved = Schema.getValidateDefaults();
Fixed Show fixed Hide fixed
Dismissed Show dismissed Hide dismissed
try {
Schema.setValidateDefaults(false);
Dismissed Show dismissed Hide dismissed
ParseContext context = new ParseContext(NameValidator.NO_VALIDATION);
Schema schema = new JsonSchemaParser().parse(context, buffer, true);
context.commit();
context.resolveAllSchemas();
return context.resolve(schema);
} finally {
Schema.setValidateDefaults(saved);
Dismissed Show dismissed Hide dismissed
}
}

@Override
public Schema parse(ParseContext parseContext, URI baseUri, CharSequence formattedSchema)
throws IOException, SchemaParseException {
return parse(parseContext, formattedSchema, parseContext.nameValidator);
return parse(parseContext, formattedSchema, false);
}

private Schema parse(ParseContext parseContext, CharSequence formattedSchema, NameValidator nameValidator)
private Schema parse(ParseContext parseContext, CharSequence formattedSchema, boolean allowInvalidDefaults)
throws SchemaParseException {
Schema.Parser parser = new Schema.Parser(nameValidator);
if (nameValidator == NameValidator.NO_VALIDATION) {
Schema.Parser parser = new Schema.Parser(parseContext);
if (allowInvalidDefaults) {
parser.setValidateDefaults(false);
} else {
parser = new Schema.Parser(nameValidator);
}
parser.addTypes(parseContext.typesByName().values());
Schema schema = parser.parse(formattedSchema.toString());
parser.getTypes().values().forEach(parseContext::put);
return schema;
return parser.parse(formattedSchema.toString());
}
}
198 changes: 158 additions & 40 deletions lang/java/avro/src/main/java/org/apache/avro/ParseContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,36 @@
package org.apache.avro;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new file, so it is okay to break the APIs here


import org.apache.avro.util.SchemaResolver;
import org.apache.avro.util.Schemas;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

/**
* Class to define a name context, useful to reference schemata with. This
* allows for the following:
*
* <ul>
* <li>Provide a default namespace for nested contexts, as found for example in
* JSON based schema definitions.</li>
* <li>Find schemata by name, including primitives.</li>
* <li>Collect new named schemata.</li>
* <li>Find schemata by name, including primitives.</li>
* <li>Find schemas that do not exist yet.</li>
* <li>Resolve references to schemas that didn't exist yet when first used.</li>
* </ul>
*
* <p>
* This class is NOT thread-safe.
* </p>
*
* <p>
* Note: this class has no use for most Avro users, but is a key component when
* implementing a schema parser.
* </p>
Expand All @@ -60,10 +71,27 @@ public class ParseContext {

private static final Set<Schema.Type> NAMED_SCHEMA_TYPES = EnumSet.of(Schema.Type.RECORD, Schema.Type.ENUM,
Schema.Type.FIXED);
/**
* Collection of old schemata. Can contain unresolved references if !isResolved.
*/
private final Map<String, Schema> oldSchemas;
/**
* Collection of new schemata. Can contain unresolved references.
*/
private final Map<String, Schema> newSchemas;
/**
* The name validator to use.
*/
// Visible for use in JsonSchemaParser
final NameValidator nameValidator;
/**
* Visitor that was used to resolve schemata with. If not available, some
* schemata in {@code oldSchemas} may not be fully resolved. If available, all
* schemata in {@code oldSchemas} are resolved, and {@code newSchemas} is empty.
* After visiting a schema, it can return the corresponding resolved schema for
* a schema that possibly contains unresolved references.
*/
private SchemaResolver.ResolvingVisitor resolvingVisitor;

/**
* Create a {@code ParseContext} for the default/{@code null} namespace, using
Expand All @@ -78,22 +106,14 @@ public ParseContext() {
* schemata.
*/
public ParseContext(NameValidator nameValidator) {
this(nameValidator, new LinkedHashMap<>(), new LinkedHashMap<>());
this(requireNonNull(nameValidator), new LinkedHashMap<>(), new LinkedHashMap<>());
}

private ParseContext(NameValidator nameValidator, Map<String, Schema> oldSchemas, Map<String, Schema> newSchemas) {
this.nameValidator = nameValidator;
this.oldSchemas = oldSchemas;
this.newSchemas = newSchemas;
}

/**
* Create a derived context using a different fallback namespace.
*
* @return a new context
*/
public ParseContext namespace() {
return new ParseContext(nameValidator, oldSchemas, newSchemas);
resolvingVisitor = null;
}

/**
Expand All @@ -109,56 +129,71 @@ public boolean contains(String name) {

/**
* <p>
* Resolve a schema by name.
* Find a schema by name and namespace.
* </p>
*
* <p>
* That is:
* </p>
*
* <ul>
* <li>If {@code fullName} is a primitive name, return a (new) schema for
* it</li>
* <li>Otherwise: resolve the schema in its own namespace and in the null
* namespace (the former takes precedence)</li>
* </ul>
* <ol>
* <li>If {@code name} is a primitive name, return a (new) schema for it</li>
* <li>Otherwise, determine the full schema name (using the given
* {@code namespace} if necessary), and find it</li>
* <li>If no schema was found and {@code name} is a simple name, find the schema
* in the default (null) namespace</li>
* <li>If still no schema was found, return an unresolved reference for the full
* schema name (see step 2)</li>
* </ol>
*
* Resolving means that the schema is returned if known, and otherwise an
* unresolved schema (a reference) is returned.
* <p>
* Note: as an unresolved reference might be returned, the schema is not
* directly usable. Please {@link #put(Schema)} the schema using it in the
* context. The {@link SchemaParser} and protocol parsers will ensure you'll
* only get a resolved schema that is usable.
* </p>
*
* @param fullName the full schema name to resolve
* @return the schema
* @throws SchemaParseException when the schema does not exist
* @param name the schema name to find
* @param namespace the namespace to find the schema against
* @return the schema, or an unresolved reference
*/
public Schema resolve(String fullName) {
Schema.Type type = PRIMITIVES.get(fullName);
public Schema find(String name, String namespace) {
Schema.Type type = PRIMITIVES.get(name);
if (type != null) {
return Schema.create(type);
}

Schema schema = getSchema(fullName);
String fullName = fullName(name, namespace);
Schema schema = getNamedSchema(fullName);
if (schema == null) {
// Not found; attempt to resolve in the default namespace
int lastDot = fullName.lastIndexOf('.');
String name = fullName.substring(lastDot + 1);
schema = getSchema(name);
schema = getNamedSchema(name);
}

return schema != null ? schema : SchemaResolver.unresolvedSchema(fullName);
}

private Schema getSchema(String fullName) {
private String fullName(String name, String namespace) {
if (namespace != null && name.lastIndexOf('.') < 0) {
return namespace + "." + name;
}
return name;
}

/**
* Get a schema by name. Note that the schema might not (yet) be resolved/usable
* until {@link #resolveAllSchemas()} has been called.
*
* @param fullName a full schema name
* @return the schema, if known
*/
public Schema getNamedSchema(String fullName) {
Schema schema = oldSchemas.get(fullName);
if (schema == null) {
schema = newSchemas.get(fullName);
}
return schema;
}

private boolean notEmpty(String str) {
return str != null && !str.isEmpty();
}

/**
* Put the schema into this context. This is an idempotent operation: it only
* fails if this context already has a different schema with the same name.
Expand All @@ -184,6 +219,7 @@ public void put(Schema schema) {
throw new SchemaParseException("Can't redefine: " + fullName);
}
} else {
resolvingVisitor = null;
Schema previouslyAddedSchema = newSchemas.putIfAbsent(fullName, schema);
if (previouslyAddedSchema != null && !previouslyAddedSchema.equals(schema)) {
throw new SchemaParseException("Can't redefine: " + fullName);
Expand All @@ -200,10 +236,10 @@ private String requireValidFullName(String fullName) {
return fullName;
}

private void validateName(String name, String what) {
private void validateName(String name, String typeOfName) {
NameValidator.Result result = nameValidator.validate(name);
if (!result.isOK()) {
throw new SchemaParseException(what + " \"" + name + "\" is invalid: " + result.getErrors());
throw new SchemaParseException(typeOfName + " \"" + name + "\" is invalid: " + result.getErrors());
}
}

Expand All @@ -216,12 +252,94 @@ public void commit() {
newSchemas.clear();
}

public SchemaParser.ParseResult commit(Schema mainSchema) {
Collection<Schema> parsedNamedSchemas = newSchemas.values();
SchemaParser.ParseResult parseResult = new SchemaParser.ParseResult() {
@Override
public Schema mainSchema() {
return mainSchema == null ? null : resolve(mainSchema);
}

@Override
public List<Schema> parsedNamedSchemas() {
return parsedNamedSchemas.stream().map(ParseContext.this::resolve).collect(Collectors.toList());
}
};
commit();
return parseResult;
}

public void rollback() {
newSchemas.clear();
}

/**
* Return all known types by their fullname.
* Resolve all (named) schemas that were parsed. This resolves all forward
* references, even if parsed from different files. Note: the context must be
* committed for this method to work.
*
* @return all parsed schemas, in the order they were parsed
* @throws AvroTypeException if a schema reference cannot be resolved
*/
public List<Schema> resolveAllSchemas() {
ensureSchemasAreResolved();

return new ArrayList<>(oldSchemas.values());
}

private void ensureSchemasAreResolved() {
if (hasNewSchemas()) {
throw new IllegalStateException("Schemas cannot be resolved unless the ParseContext is committed.");
}
if (resolvingVisitor == null) {
NameValidator saved = Schema.getNameValidator();
Fixed Show fixed Hide fixed
Dismissed Show dismissed Hide dismissed
try {
// Ensure we use the same validation when copying schemas as when they were
// defined.
Schema.setNameValidator(nameValidator);
Dismissed Show dismissed Hide dismissed
SchemaResolver.ResolvingVisitor visitor = new SchemaResolver.ResolvingVisitor(oldSchemas::get);
oldSchemas.values().forEach(schema -> Schemas.visit(schema, visitor));
// Before this point is where we can get exceptions due to resolving failures.
for (Map.Entry<String, Schema> entry : oldSchemas.entrySet()) {
entry.setValue(visitor.getResolved(entry.getValue()));
}
resolvingVisitor = visitor;
} finally {
Schema.setNameValidator(saved);
Fixed Show fixed Hide fixed
Dismissed Show dismissed Hide dismissed
}
}
}

/**
* Resolve unresolved references in a schema <em>that was parsed for this
* context</em> using the types known to this context. Note: this method will
* ensure all known schemas are resolved, or throw, and thus requires the
* context to be committed.
*
* @param schema the schema resolve
* @return the fully resolved schema
* @throws AvroTypeException if a schema reference cannot be resolved
*/
public Schema resolve(Schema schema) {
ensureSchemasAreResolved();

// As all (named) schemas are resolved now, we know:
// — All named types are either in oldSchemas or unknown.
// — All unnamed types can be visited&resolved without validation.

if (NAMED_SCHEMA_TYPES.contains(schema.getType()) && schema.getFullName() != null) {
return requireNonNull(oldSchemas.get(schema.getFullName()), () -> "Unknown schema: " + schema.getFullName());
} else {
// Unnamed or anonymous schema
// (protocol message request parameters are anonymous records)
Schemas.visit(schema, resolvingVisitor); // This field is set, as ensureSchemasAreResolved(); was called.
return resolvingVisitor.getResolved(schema);
}
}

/**
* Return all known types by their fullname. Warning: this returns all types,
* even uncommitted ones, including unresolved references!
*
* @return a map of all types by their name
*/
Expand Down
Loading
Loading