Skip to content

Commit

Permalink
Support NiFi 2.x
Browse files Browse the repository at this point in the history
The latest versions of NiFi change/remove API functions that are used by
the Daffodil processors. Trying to use the current version of these
processors in newer versions of Daffodil leads to NoClassDefFoundErrors
and breaks NiFi.

To fix this, this replaces those API functions/classes with variants
that exist in both older and newer versions of NiFi. This allows this
processor to maintain compatibility with older versions of NiFi while
still working with newer versions.

Specific changes include:

- NiFi 2.x removes support for EventDriven processors. This is likely
  rarely used in NiFi 1.x flows so support for EventDriven is removed
  for DaffodilParse/Unparse processors
- NiFi 2.x replaces the allowableValues(AllowableValues...) function
  with allowableValues(DescribedValue...). The DescribedValue variant
  does not exist in older versions of NiFi, so we instead just use the
  allowableValues(String...) variant, which exists in all versions of
  NiFi. This loses the human readable description of the field, but the
  allowable values are mostly self-descriptive. And in case there is
  still confusion, descriptions of the possible values are added to
  additionalDetails.html

Closes #18
  • Loading branch information
stevedlawrence committed Oct 9, 2024
1 parent 4fa4120 commit 9afafd0
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.TimeUnit;

import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.resource.ResourceCardinality;
Expand Down Expand Up @@ -118,13 +117,9 @@ public abstract class AbstractDaffodilProcessor extends AbstractProcessor {
static final String XML_MIME_TYPE = "application/xml";
static final String JSON_MIME_TYPE = "application/json";

static final String XML_VALUE = "xml";
static final String JSON_VALUE = "json";
static final String ATTRIBUTE_VALUE = "use mime.type attribute";

static final AllowableValue INFOSET_TYPE_XML = new AllowableValue(XML_VALUE, XML_VALUE, "The FlowFile representation is XML");
static final AllowableValue INFOSET_TYPE_JSON = new AllowableValue(JSON_VALUE, JSON_VALUE, "The FlowFile representation is JSON");
static final AllowableValue INFOSET_TYPE_ATTRIBUTE = new AllowableValue(ATTRIBUTE_VALUE, ATTRIBUTE_VALUE, "The FlowFile representation is determined based on the mime.type attribute");
static final String INFOSET_TYPE_XML = "xml";
static final String INFOSET_TYPE_JSON = "json";
static final String INFOSET_TYPE_ATTRIBUTE = "use mime.type attribute";

public static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
.name("cache-size")
Expand All @@ -144,20 +139,16 @@ public abstract class AbstractDaffodilProcessor extends AbstractProcessor {
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();

static final String OFF_VALUE = "off";
static final String LIMITED_VALUE = "limited";
static final String FULL_VALUE = "full";

static final AllowableValue VALIDATION_MODE_OFF = new AllowableValue(OFF_VALUE, OFF_VALUE, "Disable infoset validation");
static final AllowableValue VALIDATION_MODE_LIMITED= new AllowableValue(LIMITED_VALUE, LIMITED_VALUE, "Facet/restriction validation using Daffodil");
static final AllowableValue VALIDATION_MODE_FULL = new AllowableValue(FULL_VALUE, FULL_VALUE, "Full schema validation using Xerces");
static final String VALIDATION_MODE_OFF = "off";
static final String VALIDATION_MODE_LIMITED = "limited";
static final String VALIDATION_MODE_FULL = "full";

public static final PropertyDescriptor VALIDATION_MODE = new PropertyDescriptor.Builder()
.name("validation-mode")
.displayName("Validation Mode")
.description("The type of validation to be performed on the infoset.")
.required(true)
.defaultValue(OFF_VALUE)
.defaultValue(VALIDATION_MODE_OFF)
.allowableValues(VALIDATION_MODE_OFF, VALIDATION_MODE_LIMITED, VALIDATION_MODE_FULL)
.build();

Expand All @@ -182,7 +173,7 @@ public abstract class AbstractDaffodilProcessor extends AbstractProcessor {

@Override
protected void init(final ProcessorInitializationContext context) {
List<AllowableValue> allowableInfosetTypeValues = new ArrayList(Arrays.asList(INFOSET_TYPE_XML, INFOSET_TYPE_JSON));
Set<String> allowableInfosetTypeValues = new HashSet(Arrays.asList(INFOSET_TYPE_XML, INFOSET_TYPE_JSON));
if (isUnparse()) {
// using the mime type for infoset type only applies to unparse
allowableInfosetTypeValues.add(INFOSET_TYPE_ATTRIBUTE);
Expand All @@ -193,8 +184,8 @@ protected void init(final ProcessorInitializationContext context) {
.displayName("Infoset Type")
.description("The format of the FlowFile to output (for parsing) or input (for unparsing).")
.required(true)
.defaultValue(INFOSET_TYPE_XML.getValue())
.allowableValues(allowableInfosetTypeValues.toArray(new AllowableValue[allowableInfosetTypeValues.size()]))
.defaultValue(INFOSET_TYPE_XML)
.allowableValues(allowableInfosetTypeValues)
.build();

final List<PropertyDescriptor> properties = new ArrayList<>();
Expand Down Expand Up @@ -417,21 +408,21 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final ValidationMode validationMode;

switch (context.getProperty(VALIDATION_MODE).getValue()) {
case OFF_VALUE: validationMode = ValidationMode.Off; break;
case LIMITED_VALUE: validationMode = ValidationMode.Limited; break;
case FULL_VALUE: validationMode = ValidationMode.Full; break;
case VALIDATION_MODE_OFF: validationMode = ValidationMode.Off; break;
case VALIDATION_MODE_LIMITED: validationMode = ValidationMode.Limited; break;
case VALIDATION_MODE_FULL: validationMode = ValidationMode.Full; break;
default: throw new AssertionError("validation mode was not one of 'off', 'limited', or 'full'");
}

CompilationParams params = new CompilationParams(dfdlSchema, preCompiled, validationMode);

if (infosetTypeValue.equals(ATTRIBUTE_VALUE)) {
if (infosetTypeValue.equals(INFOSET_TYPE_ATTRIBUTE)) {
if (!isUnparse()) { throw new AssertionError("infoset type 'attribute' should only occur with Daffodil unparse"); }

String inputMimeType = original.getAttribute(CoreAttributes.MIME_TYPE.key());
switch (inputMimeType == null ? "" : inputMimeType) {
case XML_MIME_TYPE: infosetType = XML_VALUE; break;
case JSON_MIME_TYPE: infosetType = JSON_VALUE; break;
case XML_MIME_TYPE: infosetType = INFOSET_TYPE_XML; break;
case JSON_MIME_TYPE: infosetType = INFOSET_TYPE_JSON; break;
default:
logger.error("Infoset Type is 'attribute', but the mime.type attribute is not set or not recognized for {}.", new Object[]{original});
session.transfer(original, REL_FAILURE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;

import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
Expand All @@ -46,7 +45,6 @@
import org.apache.daffodil.japi.infoset.XMLTextInfosetOutputter;


@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
Expand All @@ -64,8 +62,8 @@ public class DaffodilParse extends AbstractDaffodilProcessor {

private InfosetOutputter getInfosetOutputter(String infosetType, OutputStream os) {
switch (infosetType) {
case XML_VALUE: return new XMLTextInfosetOutputter(os, false);
case JSON_VALUE: return new JsonInfosetOutputter(os, false);
case INFOSET_TYPE_XML: return new XMLTextInfosetOutputter(os, false);
case INFOSET_TYPE_JSON: return new JsonInfosetOutputter(os, false);
default: throw new AssertionError("Unhandled infoset type: " + infosetType);
}
}
Expand All @@ -76,8 +74,8 @@ private InfosetOutputter getInfosetOutputter(String infosetType, OutputStream os
@Override
protected String getOutputMimeType(String infosetType) {
switch (infosetType) {
case XML_VALUE: return XML_MIME_TYPE;
case JSON_VALUE: return JSON_MIME_TYPE;
case INFOSET_TYPE_XML: return XML_MIME_TYPE;
case INFOSET_TYPE_JSON: return JSON_MIME_TYPE;
default: throw new AssertionError("Unhandled infoset type: " + infosetType);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;

import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
Expand All @@ -43,7 +42,6 @@
import org.apache.daffodil.japi.infoset.JsonInfosetInputter;
import org.apache.daffodil.japi.infoset.XMLTextInfosetInputter;

@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
Expand All @@ -61,8 +59,8 @@ public class DaffodilUnparse extends AbstractDaffodilProcessor {

private InfosetInputter getInfosetInputter(String infosetType, InputStream is) {
switch (infosetType) {
case XML_VALUE: return new XMLTextInfosetInputter(is);
case JSON_VALUE: return new JsonInfosetInputter(is);
case INFOSET_TYPE_XML: return new XMLTextInfosetInputter(is);
case INFOSET_TYPE_JSON: return new JsonInfosetInputter(is);
default: throw new AssertionError("Unhandled infoset type: " + infosetType);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ <h2>Compiled DFDL Schema Cache</h2>
<dd>
<p>
Cached compiled DFDL schemas that go unused for a specified amount of time are removed from the cache to
save memory. This time is defind by the <tt>Cache TTL after last access</tt> property, with the format of
save memory. This time is defined by the <tt>Cache TTL after last access</tt> property, with the format of
<tt>&lt;duration&gt; &lt;time_unit&gt;</tt>, where <tt>&lt;duration&gt;</tt> is a non-negative integer and
<tt>&lt;time_unit&gt;</tt> is a supported unit of time, such as nanos, millis, secs, mins, hrs, days. The
default is "0 seconds", which means compiled DFDL schemas are never removed from the cache.
Expand All @@ -153,5 +153,23 @@ <h2>Compiled DFDL Schema Cache</h2>
Restart the processor to manually empty the cache and recompile/reload schemas as needed.
</p>

<h2>Infoset Types</h2>
<p>
This processor supports parsing to XML and JSON infosets as specified by the <tt>Infoset Type</tt>
property, which can be either <tt>xml</tt> or <tt>json</tt>. If the parse is successful, the
<tt>mime.type</tt> attribute is set to <tt>application/xml</tt> or <tt>application/json</tt>
accordingly.
</p>

<h2>Validation Mode</h2>
<p>
The DaffodilParse processor can optionally enable validation of the infoset. A value of
<tt>limited</tt> uses Daffodil's built-in validation mechanism. A value of <tt>full</tt> enables XSD
validation using Apache Xerces. A value of <tt>off</tt> disables validation.
</p>
<p>If validation is enabled (either <tt>limited</tt> or <tt>full</tt>) and any validation errors are
found, the FlowFile is transferred to the <i>failure</i> relationship.
</p>

</body>
</html>
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ <h2>Compiled DFDL Schema Cache</h2>
<dd>
<p>
Cached compiled DFDL schemas that go unused for a specified amount of time are removed from the cache to
save memory. This time is defind by the <tt>Cache TTL after last access</tt> property, with the format of
save memory. This time is defined by the <tt>Cache TTL after last access</tt> property, with the format of
<tt>&lt;duration&gt; &lt;time_unit&gt;</tt>, where <tt>&lt;duration&gt;</tt> is a non-negative integer and
<tt>&lt;time_unit&gt;</tt> is a supported unit of time, such as nanos, millis, secs, mins, hrs, days. The
default is "0 seconds", which means compiled DFDL schemas are never removed from the cache.
Expand All @@ -154,5 +154,19 @@ <h2>Compiled DFDL Schema Cache</h2>
Restart the processor to manually empty the cache and recompile/reload schemas as needed.
</p>

<h2>Infoset Types</h2>
<p>
This processor supports unparsing XML and JSON infosets as specified by the <tt>Infoset Type</tt>
property. Alternatively, the property can be set so that the processor uses the <tt>mime.type</tt>
attribute. In this case, an attribute value of <tt>application/xml</tt> or <tt>application/json</tt>
will cause the processor to expect an XML or JSON infoset respectively. Note that the DaffodilParse
processor sets the <tt>mime.type</tt> attribute based its <tt>Infoset Type</tt> property.
</p>

<h2>Validation Mode</h2>
<p>
The DaffodilUnparse processor ignores the validation mode setting. Future versions may enable this.
</p>

</body>
</html>
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void testNoCache() throws IOException {
public void testParseCSVJson() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(DaffodilParse.class);
testRunner.setProperty(DaffodilParse.DFDL_SCHEMA_FILE, "src/test/resources/TestDaffodilProcessor/csv.dfdl.xsd");
testRunner.setProperty("infoset-type", DaffodilParse.JSON_VALUE);
testRunner.setProperty("infoset-type", DaffodilParse.INFOSET_TYPE_JSON);
testRunner.enqueue(Paths.get("src/test/resources/TestDaffodilProcessor/tokens.csv"));
testRunner.run();
testRunner.assertAllFlowFilesTransferred(DaffodilParse.REL_SUCCESS);
Expand All @@ -211,7 +211,7 @@ public void testParseCSVJson() throws IOException {
public void testUnparseCSVJson() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(DaffodilUnparse.class);
testRunner.setProperty(DaffodilUnparse.DFDL_SCHEMA_FILE, "src/test/resources/TestDaffodilProcessor/csv.dfdl.xsd");
testRunner.setProperty("infoset-type", DaffodilUnparse.JSON_VALUE);
testRunner.setProperty("infoset-type", DaffodilUnparse.INFOSET_TYPE_JSON);
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), DaffodilUnparse.JSON_MIME_TYPE);
testRunner.enqueue(Paths.get("src/test/resources/TestDaffodilProcessor/tokens.csv.json"), attributes);
Expand All @@ -227,15 +227,15 @@ public void testUnparseCSVJson() throws IOException {
public void testParseCSVAttributeInvalid() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(DaffodilParse.class);
testRunner.setProperty(DaffodilParse.DFDL_SCHEMA_FILE, "src/test/resources/TestDaffodilProcessor/csv.dfdl.xsd");
testRunner.setProperty("infoset-type", DaffodilParse.ATTRIBUTE_VALUE);
testRunner.setProperty("infoset-type", DaffodilParse.INFOSET_TYPE_ATTRIBUTE);
testRunner.assertNotValid();
}

@Test
public void testUnparseCSVAttributeJSON() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(DaffodilUnparse.class);
testRunner.setProperty(DaffodilUnparse.DFDL_SCHEMA_FILE, "src/test/resources/TestDaffodilProcessor/csv.dfdl.xsd");
testRunner.setProperty("infoset-type", DaffodilUnparse.ATTRIBUTE_VALUE);
testRunner.setProperty("infoset-type", DaffodilUnparse.INFOSET_TYPE_ATTRIBUTE);
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), DaffodilUnparse.JSON_MIME_TYPE);
testRunner.enqueue(Paths.get("src/test/resources/TestDaffodilProcessor/tokens.csv.json"), attributes);
Expand All @@ -251,7 +251,7 @@ public void testUnparseCSVAttributeJSON() throws IOException {
public void testUnparseCSVAttributeXML() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(DaffodilUnparse.class);
testRunner.setProperty(DaffodilUnparse.DFDL_SCHEMA_FILE, "src/test/resources/TestDaffodilProcessor/csv.dfdl.xsd");
testRunner.setProperty("infoset-type", DaffodilUnparse.ATTRIBUTE_VALUE);
testRunner.setProperty("infoset-type", DaffodilUnparse.INFOSET_TYPE_ATTRIBUTE);
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), DaffodilUnparse.XML_MIME_TYPE);
testRunner.enqueue(Paths.get("src/test/resources/TestDaffodilProcessor/tokens.csv.xml"), attributes);
Expand All @@ -267,7 +267,7 @@ public void testUnparseCSVAttributeXML() throws IOException {
public void testUnparseCSVAttributeUndefined() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(DaffodilUnparse.class);
testRunner.setProperty(DaffodilUnparse.DFDL_SCHEMA_FILE, "src/test/resources/TestDaffodilProcessor/csv.dfdl.xsd");
testRunner.setProperty("infoset-type", DaffodilUnparse.ATTRIBUTE_VALUE);
testRunner.setProperty("infoset-type", DaffodilUnparse.INFOSET_TYPE_ATTRIBUTE);
final Map<String, String> attributes = new HashMap<>();
testRunner.enqueue(Paths.get("src/test/resources/TestDaffodilProcessor/tokens.csv.xml"), attributes);
testRunner.run();
Expand All @@ -281,7 +281,7 @@ public void testUnparseCSVAttributeUndefined() throws IOException {
public void testUnparseCSVAttributeUnknown() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(DaffodilUnparse.class);
testRunner.setProperty(DaffodilUnparse.DFDL_SCHEMA_FILE, "src/test/resources/TestDaffodilProcessor/csv.dfdl.xsd");
testRunner.setProperty("infoset-type", DaffodilUnparse.ATTRIBUTE_VALUE);
testRunner.setProperty("infoset-type", DaffodilUnparse.INFOSET_TYPE_ATTRIBUTE);
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/unknown");
testRunner.enqueue(Paths.get("src/test/resources/TestDaffodilProcessor/tokens.csv.xml"), attributes);
Expand Down Expand Up @@ -321,7 +321,7 @@ public void testParseNoLeftOverData() throws IOException {
public void testParseCSVValidationLimited() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(DaffodilParse.class);
testRunner.setProperty(DaffodilParse.DFDL_SCHEMA_FILE, "src/test/resources/TestDaffodilProcessor/csv.dfdl.xsd");
testRunner.setProperty(DaffodilParse.VALIDATION_MODE, DaffodilParse.LIMITED_VALUE);
testRunner.setProperty(DaffodilParse.VALIDATION_MODE, DaffodilParse.VALIDATION_MODE_LIMITED);
testRunner.enqueue(Paths.get("src/test/resources/TestDaffodilProcessor/tokens.csv"));
testRunner.run();
testRunner.assertAllFlowFilesTransferred(DaffodilParse.REL_FAILURE);
Expand All @@ -334,7 +334,7 @@ public void testParseCSVValidationLimited() throws IOException {
public void testParseCSVValidationFull() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(DaffodilParse.class);
testRunner.setProperty(DaffodilParse.DFDL_SCHEMA_FILE, "src/test/resources/TestDaffodilProcessor/csv.dfdl.xsd");
testRunner.setProperty(DaffodilParse.VALIDATION_MODE, DaffodilParse.FULL_VALUE);
testRunner.setProperty(DaffodilParse.VALIDATION_MODE, DaffodilParse.VALIDATION_MODE_FULL);
testRunner.enqueue(Paths.get("src/test/resources/TestDaffodilProcessor/tokens.csv"));
testRunner.run();
testRunner.assertAllFlowFilesTransferred(DaffodilParse.REL_FAILURE);
Expand Down

0 comments on commit 9afafd0

Please sign in to comment.