pluginPaths = new ArrayList<>();
+ for (int i = 4; i < args.length; ++i) {
+ pluginPaths.add(Paths.get(args[i]));
+ }
PipesServer server =
new PipesServer(tikaConfig, System.in, System.out, maxForEmitBatchBytes,
- serverParseTimeoutMillis, serverWaitTimeoutMillis);
+ serverParseTimeoutMillis, serverWaitTimeoutMillis, pluginPaths);
System.setIn(UnsynchronizedByteArrayInputStream.builder().setByteArray(new byte[0]).get());
System.setOut(System.err);
Thread watchdog = new Thread(server, "Tika Watchdog");
@@ -455,10 +463,6 @@ private Fetcher getFetcher(FetchEmitTuple t) {
LOG.warn(noFetcherMsg);
write(STATUS.FETCHER_NOT_FOUND, noFetcherMsg);
return null;
- } catch (IOException | TikaException e) {
- LOG.warn("Couldn't initialize fetcher for fetch id '" + t.getId() + "'", e);
- write(STATUS.FETCHER_INITIALIZATION_EXCEPTION, ExceptionUtils.getStackTrace(e));
- return null;
}
}
@@ -743,8 +747,11 @@ private FetchEmitTuple readFetchEmitTuple() {
protected void initializeResources() throws TikaException, IOException, SAXException {
//TODO allowed named configurations in tika config
+ pluginManager = pluginDirs == null || pluginDirs.isEmpty() ? new TikaPluginManager() : new TikaPluginManager(pluginDirs);
+ pluginManager.loadPlugins();
+ pluginManager.startPlugins();
this.tikaConfig = new TikaConfig(tikaConfigPath);
- this.fetcherManager = FetcherManager.load(tikaConfigPath);
+ this.fetcherManager = FetcherManager.load(pluginManager);
//skip initialization of the emitters if emitting
//from the pipesserver is turned off.
if (maxForEmitBatchBytes > -1) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
similarity index 98%
rename from tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index bc55cca5db..b13ba64d96 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -23,7 +23,7 @@
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.pipes.PipesConfigBase;
-import org.apache.tika.pipes.PipesReporter;
+import org.apache.tika.pipes.reporter.PipesReporter;
public class AsyncConfig extends PipesConfigBase {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
similarity index 99%
rename from tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 3a6751f4ff..e0939f6a2b 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -37,13 +37,13 @@
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.PipesClient;
import org.apache.tika.pipes.PipesException;
-import org.apache.tika.pipes.PipesReporter;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
import org.apache.tika.pipes.pipesiterator.TotalCountResult;
import org.apache.tika.pipes.pipesiterator.TotalCounter;
+import org.apache.tika.pipes.reporter.PipesReporter;
/**
* This is the main class for handling async requests. This manages
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/OfferLargerThanQueueSize.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/OfferLargerThanQueueSize.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/async/OfferLargerThanQueueSize.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/async/OfferLargerThanQueueSize.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
similarity index 94%
rename from tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
index c748541afb..c8b98de177 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
@@ -19,10 +19,12 @@
import java.io.IOException;
import java.util.List;
+import org.pf4j.ExtensionPoint;
+
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
-public interface Emitter {
+public interface Emitter extends ExtensionPoint {
String getName();
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitterManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/EmitterManager.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitterManager.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/EmitterManager.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java
diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/exception/PipesRuntimeException.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/exception/PipesRuntimeException.java
new file mode 100644
index 0000000000..45f4982b86
--- /dev/null
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/exception/PipesRuntimeException.java
@@ -0,0 +1,22 @@
+package org.apache.tika.pipes.exception;
+
+public class PipesRuntimeException extends RuntimeException {
+ public PipesRuntimeException() {
+ }
+
+ public PipesRuntimeException(String message) {
+ super(message);
+ }
+
+ public PipesRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PipesRuntimeException(Throwable cause) {
+ super(cause);
+ }
+
+ public PipesRuntimeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java
similarity index 98%
rename from tika-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java
index 07c9f7507f..7577da8879 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java
@@ -22,6 +22,7 @@
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.extractor.AbstractEmbeddedDocumentBytesHandler;
+import org.apache.tika.extractor.EmbeddedDocumentBytesConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.FetchEmitTuple;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/AbstractFetcher.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/AbstractFetcher.java
similarity index 77%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/AbstractFetcher.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/AbstractFetcher.java
index 0b417e3fb1..71a1cd394a 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/AbstractFetcher.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/AbstractFetcher.java
@@ -16,29 +16,23 @@
*/
package org.apache.tika.pipes.fetcher;
-import org.apache.tika.config.Field;
-
-
public abstract class AbstractFetcher implements Fetcher {
- private String name;
+ private String pluginId;
public AbstractFetcher() {
}
- public AbstractFetcher(String name) {
- this.name = name;
+ public AbstractFetcher(String pluginId) {
+ this.pluginId = pluginId;
}
- @Override
- public String getName() {
- return name;
+ public String getPluginId() {
+ return pluginId;
}
- @Field
- public void setName(String name) {
- this.name = name;
+ public void setPluginId(String pluginId) {
+ this.pluginId = pluginId;
}
-
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
similarity index 91%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
index d64f815244..60bb81d6a0 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
@@ -25,9 +25,11 @@
public class EmptyFetcher implements Fetcher {
+ public static final String PLUGIN_ID = "empty-fetcher";
+
@Override
- public String getName() {
- return "empty";
+ public String getPluginId() {
+ return PLUGIN_ID;
}
@Override
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
similarity index 92%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
index 8f7a186fd5..dd8ee695a7 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
@@ -19,6 +19,8 @@
import java.io.IOException;
import java.io.InputStream;
+import org.pf4j.ExtensionPoint;
+
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
@@ -30,9 +32,9 @@
*
* Implementations of Fetcher must be thread safe.
*/
-public interface Fetcher {
+public interface Fetcher extends ExtensionPoint {
- String getName();
+ String getPluginId();
InputStream fetch(String fetchKey, Metadata metadata, ParseContext parseContext) throws TikaException, IOException;
}
diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/FetcherManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/FetcherManager.java
new file mode 100644
index 0000000000..d0a532b499
--- /dev/null
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/FetcherManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.fetcher;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.pf4j.PluginManager;
+
+import org.apache.tika.config.ConfigBase;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.exception.PipesRuntimeException;
+import org.apache.tika.pipes.plugin.TikaPluginManager;
+
+/**
+ * Utility class to hold multiple fetchers.
+ *
+ * This forbids multiple fetchers supporting the same name.
+ */
+public class FetcherManager extends ConfigBase {
+ private final PluginManager pluginManager;
+
+ public FetcherManager() throws TikaConfigException {
+ pluginManager = new TikaPluginManager();
+ }
+
+ public FetcherManager(PluginManager pluginManager) {
+ this.pluginManager = pluginManager;
+ }
+
+ public static FetcherManager load(PluginManager pluginManager) {
+ return new FetcherManager(pluginManager);
+ }
+
+ public Fetcher getFetcher(String pluginId) {
+ return pluginManager.getExtensions(Fetcher.class, pluginId)
+ .stream()
+ .findFirst()
+ .orElseThrow(() -> new PipesRuntimeException("Could not find Fetcher extension for plugin " + pluginId));
+ }
+
+ public Set getSupported() {
+ return pluginManager.getExtensions(Fetcher.class)
+ .stream()
+ .map(Fetcher::getPluginId)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Convenience method that returns a fetcher if only one fetcher
+ * is specified in the tika-config file. If 0 or > 1 fetchers
+ * are specified, this throws an IllegalArgumentException.
+ * @return
+ */
+ public Fetcher getFetcher() {
+ return pluginManager.getExtensions(Fetcher.class)
+ .stream()
+ .findFirst()
+ .orElseThrow(() -> new PipesRuntimeException("Could not find any instances of the Fetcher extension"));
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetcherStringException.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/FetcherStringException.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetcherStringException.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/FetcherStringException.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/config/FetcherConfig.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/config/FetcherConfig.java
new file mode 100644
index 0000000000..400b644005
--- /dev/null
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/config/FetcherConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.fetcher.config;
+
+public abstract class FetcherConfig {
+ private String fetcherId;
+
+ abstract public String getFetcherPluginId();
+
+ public void setFetcherPluginId(String fetcherPluginId) {
+ // no op - we put this here to appease the ConfigBase
+ }
+
+ public String getFetcherId() {
+ return fetcherId;
+ }
+
+ public FetcherConfig setFetcherId(String fetcherId) {
+ this.fetcherId = fetcherId;
+ return this;
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/config/FetcherConfigContainer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/config/FetcherConfigContainer.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/config/FetcherConfigContainer.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/fetcher/config/FetcherConfigContainer.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/CallablePipesIterator.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/CallablePipesIterator.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/CallablePipesIterator.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/CallablePipesIterator.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
similarity index 98%
rename from tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
index 34706f7e88..e142ffc2b0 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
@@ -41,6 +41,7 @@
import org.apache.tika.exception.TikaTimeoutException;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.HandlerConfig;
+import org.apache.tika.pipes.pipesiterator.fs.IPipesIterator;
import org.apache.tika.sax.BasicContentHandlerFactory;
/**
@@ -51,7 +52,7 @@
* next() is called after hasNext() has returned false.
*/
public abstract class PipesIterator extends ConfigBase
- implements Callable, Iterable, Initializable {
+ implements IPipesIterator, Callable, Iterable, Initializable {
public static final long DEFAULT_MAX_WAIT_MS = 300_000;
public static final int DEFAULT_QUEUE_SIZE = 1000;
@@ -177,9 +178,7 @@ protected HandlerConfig getHandlerConfig() {
return new HandlerConfig(handlerType, parseMode, writeLimit, maxEmbeddedResources,
throwOnWriteLimitReached);
}
-
- protected abstract void enqueue() throws IOException, TimeoutException, InterruptedException;
-
+
protected void tryToAdd(FetchEmitTuple p) throws InterruptedException, TimeoutException {
added++;
boolean offered = queue.offer(p, maxWaitMs, TimeUnit.MILLISECONDS);
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/TotalCountResult.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/TotalCountResult.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/TotalCountResult.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/TotalCountResult.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/TotalCounter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/TotalCounter.java
similarity index 100%
rename from tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/TotalCounter.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/TotalCounter.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
similarity index 97%
rename from tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
index 75cb8390cc..c28db99890 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
@@ -59,7 +59,7 @@ public class FileListPipesIterator extends PipesIterator implements Initializabl
private Path fileListPath;
@Override
- protected void enqueue() throws IOException, TimeoutException, InterruptedException {
+ public void enqueue() throws IOException, TimeoutException, InterruptedException {
try (BufferedReader reader = Files.newBufferedReader(fileListPath, StandardCharsets.UTF_8)) {
if (hasHeader) {
reader.readLine();
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
similarity index 98%
rename from tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
index 967df73b99..34a61b428a 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
@@ -72,7 +72,7 @@ public void setBasePath(String basePath) {
}
@Override
- protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+ public void enqueue() throws InterruptedException, IOException, TimeoutException {
if (!Files.isDirectory(basePath)) {
throw new IllegalArgumentException(
"\"basePath\" directory does not exist: " + basePath.toAbsolutePath());
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/config/AbstractConfig.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/IPipesIterator.java
similarity index 73%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/config/AbstractConfig.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/IPipesIterator.java
index a1c7e48734..acd34f23c4 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/config/AbstractConfig.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/IPipesIterator.java
@@ -14,8 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetcher.config;
+package org.apache.tika.pipes.pipesiterator.fs;
-public abstract class AbstractConfig {
- // Nothing to do here yet.
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import org.pf4j.ExtensionPoint;
+
+public interface IPipesIterator extends ExtensionPoint {
+ void enqueue() throws IOException, TimeoutException, InterruptedException;
}
diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/plugin/ClasspathPluginPropertiesFinder.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/plugin/ClasspathPluginPropertiesFinder.java
new file mode 100644
index 0000000000..2d1a0a3fd6
--- /dev/null
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/plugin/ClasspathPluginPropertiesFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.plugin;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.pf4j.PropertiesPluginDescriptorFinder;
+
+public class ClasspathPluginPropertiesFinder extends PropertiesPluginDescriptorFinder {
+ @Override
+ protected Path getPropertiesPath(Path pluginPath, String propertiesFileName) {
+ Path propertiesPath = super.getPropertiesPath(pluginPath, propertiesFileName);
+ if (!propertiesPath.toFile().exists()) {
+ // If in development mode, we can also pull the plugin.properties from $pluginDir/src/main/resources/plugin.properties
+ propertiesPath = Paths.get(propertiesPath.getParent().toAbsolutePath().toString(), "src", "main", "resources", "plugin.properties");
+ }
+ return propertiesPath;
+ }
+}
diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/plugin/TikaPluginManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/plugin/TikaPluginManager.java
new file mode 100644
index 0000000000..b71bae1dd0
--- /dev/null
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/plugin/TikaPluginManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.plugin;
+
+import java.nio.file.Path;
+import java.util.List;
+
+import org.pf4j.DefaultPluginManager;
+import org.pf4j.PluginDescriptorFinder;
+import org.pf4j.PluginLoader;
+import org.pf4j.PluginWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.pipes.exception.PipesRuntimeException;
+import org.apache.tika.pipes.fetcher.Fetcher;
+
+public class TikaPluginManager extends DefaultPluginManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TikaPluginManager.class);
+ public TikaPluginManager() {
+ }
+
+ public TikaPluginManager(Path... pluginsRoots) {
+ super(pluginsRoots);
+ }
+
+ public TikaPluginManager(List pluginsRoots) {
+ super(pluginsRoots);
+ }
+
+ @Override
+ protected PluginDescriptorFinder createPluginDescriptorFinder() {
+ return new ClasspathPluginPropertiesFinder();
+ }
+
+ @Override
+ protected PluginLoader createPluginLoader() {
+ return super.createPluginLoader();
+ }
+
+ @Override
+ public void loadPlugins() {
+ super.loadPlugins();
+ LOGGER.info("Loaded {} plugins", getPlugins().size());
+ }
+
+ @Override
+ public void startPlugins() {
+ super.startPlugins();
+ for (PluginWrapper plugin : getStartedPlugins()) {
+ LOGGER.info("Add-in " + plugin.getPluginId() + " : " + plugin.getDescriptor() + " has started.");
+ checkFetcherExtensions(plugin);
+ }
+ }
+
+ private void checkFetcherExtensions(PluginWrapper plugin) {
+ for (Class> extensionClass : getExtensionClasses(Fetcher.class, plugin.getPluginId())) {
+ if (!Fetcher.class.isAssignableFrom(extensionClass)) {
+ throw new PipesRuntimeException("Something is wrong with the classpath. " + Fetcher.class.getName() +
+ " should be assignable from " + extensionClass.getName() +
+ ". Did tika-core accidentally get in your plugin lib?");
+ }
+ LOGGER.info(" Extension " + extensionClass + " has been registered to plugin " + plugin.getPluginId());
+ }
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/reporter/PipesReporter.java
similarity index 95%
rename from tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/reporter/PipesReporter.java
index 3978039b40..1a9c165d03 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/reporter/PipesReporter.java
@@ -14,11 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes;
+package org.apache.tika.pipes.reporter;
import java.io.Closeable;
import java.io.IOException;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.pipesiterator.TotalCountResult;
/**
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporterBase.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/reporter/PipesReporterBase.java
similarity index 98%
rename from tika-core/src/main/java/org/apache/tika/pipes/PipesReporterBase.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/reporter/PipesReporterBase.java
index 3dcddfa71e..6cdf89767f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporterBase.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/reporter/PipesReporterBase.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes;
+package org.apache.tika.pipes.reporter;
import java.util.HashSet;
import java.util.List;
@@ -26,6 +26,7 @@
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.PipesResult;
/**
* Base class that includes filtering by {@link PipesResult.STATUS}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/reporter/logging/LoggingPipesReporter.java
similarity index 88%
rename from tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java
rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/reporter/logging/LoggingPipesReporter.java
index 5f00880ba0..fe61d86ddc 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java
+++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/reporter/logging/LoggingPipesReporter.java
@@ -14,12 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes;
+package org.apache.tika.pipes.reporter.logging;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.pipes.reporter.PipesReporter;
+
/**
* Simple PipesReporter that logs everything at the debug level.
*/
diff --git a/tika-core/src/main/resources/pipes-fork-server-default-log4j2.xml b/tika-pipes/tika-pipes-core/src/main/resources/pipes-fork-server-default-log4j2.xml
similarity index 100%
rename from tika-core/src/main/resources/pipes-fork-server-default-log4j2.xml
rename to tika-pipes/tika-pipes-core/src/main/resources/pipes-fork-server-default-log4j2.xml
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java
similarity index 100%
rename from tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
similarity index 80%
rename from tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
index ff80bb9160..6fd2d39c89 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
+++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
@@ -29,18 +29,22 @@
import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.pf4j.PluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.tika.TikaTest;
import org.apache.tika.extractor.BasicEmbeddedDocumentBytesHandler;
+import org.apache.tika.extractor.EmbeddedDocumentBytesConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
+import org.apache.tika.pipes.plugin.TikaPluginManager;
public class PipesServerTest extends TikaTest {
+ private static final Logger LOG = LoggerFactory.getLogger(PipesServerTest.class);
/**
* This test is useful for stepping through the debugger on PipesServer
@@ -64,16 +68,24 @@ public void testBasic(@TempDir Path tmp) throws Exception {
UnsynchronizedByteArrayInputStream.builder().setByteArray(new byte[0]).get(),
new PrintStream(UnsynchronizedByteArrayOutputStream.builder().get(), true,
StandardCharsets.UTF_8.name()),
- -1, 30000, 30000);
+ -1, 30000, 30000, null);
pipesServer.initializeResources();
FetchEmitTuple fetchEmitTuple = new FetchEmitTuple("id",
new FetchKey("fs", "mock.xml"),
new EmitKey("", ""));
- Fetcher fetcher = FetcherManager.load(tikaConfig).getFetcher();
+
+ System.setProperty("pf4j.mode", "development"); // Development mode lets you work from source dir easier.
+ Path fetchersPath = Path.of("..", "tika-fetchers");
+ LOG.info("Using pf4j in development mode using plugins dir: {}", fetchersPath.toFile().getCanonicalPath());
+ PluginManager pluginManager = new TikaPluginManager(fetchersPath);
+ pluginManager.loadPlugins();
+ pluginManager.startPlugins();
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager);
+
PipesServer.MetadataListAndEmbeddedBytes
- parseData = pipesServer.parseFromTuple(fetchEmitTuple, fetcher);
+ parseData = pipesServer.parseFromTuple(fetchEmitTuple, fetcherManager.getFetcher("file-system-fetcher"));
assertEquals("5f3b924303e960ce35d7f705e91d3018dd110a9c3cef0546a91fe013d6dad6fd",
parseData.metadataList.get(0).get("X-TIKA:digest:SHA-256"));
}
@@ -99,7 +111,7 @@ public void testEmbeddedStreamEmitter(@TempDir Path tmp) throws Exception {
UnsynchronizedByteArrayInputStream.builder().setByteArray(new byte[0]).get(),
new PrintStream(UnsynchronizedByteArrayOutputStream.builder().get(), true,
StandardCharsets.UTF_8.name()),
- -1, 30000, 30000);
+ -1, 30000, 30000, null);
pipesServer.initializeResources();
EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig =
@@ -111,9 +123,16 @@ public void testEmbeddedStreamEmitter(@TempDir Path tmp) throws Exception {
FetchEmitTuple fetchEmitTuple = new FetchEmitTuple("id",
new FetchKey("fs", "mock.xml"),
new EmitKey("", ""), new Metadata(), parseContext);
- Fetcher fetcher = FetcherManager.load(tikaConfig).getFetcher();
+ System.setProperty("pf4j.mode", "development"); // Development mode lets you work from source dir easier.
+ Path fetchersPath = Path.of("..", "..");
+ LOG.info("Using pf4j in development mode using plugins dir: {}", fetchersPath.toFile().getCanonicalPath());
+ PluginManager pluginManager = new TikaPluginManager(fetchersPath);
+ pluginManager.loadPlugins();
+ pluginManager.startPlugins();
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager);
+
PipesServer.MetadataListAndEmbeddedBytes
- parseData = pipesServer.parseFromTuple(fetchEmitTuple, fetcher);
+ parseData = pipesServer.parseFromTuple(fetchEmitTuple, fetcherManager.getFetcher());
assertEquals(2, parseData.metadataList.size());
byte[] bytes0 =
@@ -155,7 +174,7 @@ public void testEmbeddedStreamEmitterLimitBytes(@TempDir Path tmp) throws Except
UnsynchronizedByteArrayInputStream.builder().setByteArray(new byte[0]).get(),
new PrintStream(UnsynchronizedByteArrayOutputStream.builder().get(), true,
StandardCharsets.UTF_8.name()),
- -1, 30000, 30000);
+ -1, 30000, 30000, null);
pipesServer.initializeResources();
EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig =
@@ -168,9 +187,16 @@ public void testEmbeddedStreamEmitterLimitBytes(@TempDir Path tmp) throws Except
new FetchKey("fs", "mock.xml"),
new EmitKey("", ""), new Metadata(), parseContext);
- Fetcher fetcher = FetcherManager.load(tikaConfig).getFetcher();
+ System.setProperty("pf4j.mode", "development"); // Development mode lets you work from source dir easier.
+ Path fetchersPath = Path.of("..", "..");
+ LOG.info("Using pf4j in development mode using plugins dir: {}", fetchersPath.toFile().getCanonicalPath());
+ PluginManager pluginManager = new TikaPluginManager(fetchersPath);
+ pluginManager.loadPlugins();
+ pluginManager.startPlugins();
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager);
+
PipesServer.MetadataListAndEmbeddedBytes
- parseData = pipesServer.parseFromTuple(fetchEmitTuple, fetcher);
+ parseData = pipesServer.parseFromTuple(fetchEmitTuple, fetcherManager.getFetcher());
assertEquals(2, parseData.metadataList.size());
byte[] bytes0 =
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncChaosMonkeyTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/AsyncChaosMonkeyTest.java
similarity index 100%
rename from tika-core/src/test/java/org/apache/tika/pipes/async/AsyncChaosMonkeyTest.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/AsyncChaosMonkeyTest.java
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockDigesterFactory.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockDigesterFactory.java
similarity index 100%
rename from tika-core/src/test/java/org/apache/tika/pipes/async/MockDigesterFactory.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockDigesterFactory.java
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
similarity index 100%
rename from tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
similarity index 96%
rename from tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
index acb533ece4..ef825e81e0 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
+++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
@@ -27,14 +27,13 @@
import org.apache.tika.pipes.fetcher.Fetcher;
public class MockFetcher implements Fetcher {
-
private static final byte[] BYTES = ("" + "" +
"Nikolai Lobachevsky" +
"main_content" + "").getBytes(StandardCharsets.UTF_8);
@Override
- public String getName() {
- return "mock";
+ public String getPluginId() {
+ return "mock-fetcher";
}
@Override
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
similarity index 97%
rename from tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
index 6e8308c895..2a59859785 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
+++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
@@ -20,8 +20,8 @@
import org.apache.tika.config.Field;
import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.PipesReporter;
import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.pipes.reporter.PipesReporter;
public class MockReporter extends PipesReporter {
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java
similarity index 97%
rename from tika-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java
index 9bfcd55918..99dd94edb9 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java
+++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java
@@ -26,7 +26,7 @@
import org.junit.jupiter.api.Test;
import org.apache.tika.pipes.CompositePipesReporter;
-import org.apache.tika.pipes.PipesReporter;
+import org.apache.tika.pipes.reporter.PipesReporter;
public class MockReporterTest {
diff --git a/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/config/TikaPipesConfigTest.java
similarity index 69%
rename from tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/config/TikaPipesConfigTest.java
index 3ea1e538ce..392dbc611f 100644
--- a/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
+++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/config/TikaPipesConfigTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.config;
+package org.apache.tika.pipes.config;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -22,61 +22,22 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.List;
import org.junit.jupiter.api.Test;
+import org.apache.tika.config.AbstractTikaConfigTest;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.pipes.CompositePipesReporter;
-import org.apache.tika.pipes.PipesReporter;
import org.apache.tika.pipes.async.AsyncConfig;
import org.apache.tika.pipes.async.MockReporter;
import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.pipes.fetcher.FetcherManager;
-import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
+import org.apache.tika.pipes.reporter.PipesReporter;
public class TikaPipesConfigTest extends AbstractTikaConfigTest {
//this handles tests for the newer pipes type configs.
-
- @Test
- public void testFetchers() throws Exception {
- FetcherManager m = FetcherManager.load(getConfigFilePath("fetchers-config.xml"));
- Fetcher f1 = m.getFetcher("fs1");
- assertEquals(Paths.get("/my/base/path1"), ((FileSystemFetcher) f1).getBasePath());
-
- Fetcher f2 = m.getFetcher("fs2");
- assertEquals(Paths.get("/my/base/path2"), ((FileSystemFetcher) f2).getBasePath());
- }
-
- @Test
- public void testDuplicateFetchers() throws Exception {
- //can't have two fetchers with the same name
- assertThrows(TikaConfigException.class, () -> {
- FetcherManager.load(getConfigFilePath("fetchers-duplicate-config.xml"));
- });
- }
-
- @Test
- public void testNoNameFetchers() throws Exception {
- //can't have two fetchers with an empty name
- assertThrows(TikaConfigException.class, () -> {
- FetcherManager.load(getConfigFilePath("fetchers-noname-config.xml"));
- });
- }
-
- @Test
- public void testNoBasePathFetchers() throws Exception {
- //no basepath is allowed as of > 2.3.0
- //test that this does not throw an exception.
-
- FetcherManager fetcherManager = FetcherManager.load(
- getConfigFilePath("fetchers-nobasepath-config.xml"));
- }
-
@Test
public void testEmitters() throws Exception {
EmitterManager emitterManager =
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
similarity index 100%
rename from tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
similarity index 100%
rename from tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
similarity index 100%
rename from tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIteratorTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIteratorTest.java
similarity index 100%
rename from tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIteratorTest.java
rename to tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIteratorTest.java
diff --git a/tika-pipes/tika-pipes-core/src/test/resources/log4j2.xml b/tika-pipes/tika-pipes-core/src/test/resources/log4j2.xml
new file mode 100644
index 0000000000..5f946e6e5c
--- /dev/null
+++ b/tika-pipes/tika-pipes-core/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/TIKA-3941.xml b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/TIKA-3941.xml
similarity index 100%
rename from tika-core/src/test/resources/org/apache/tika/pipes/TIKA-3941.xml
rename to tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/TIKA-3941.xml
diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/TIKA-4207-limit-bytes.xml b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/TIKA-4207-limit-bytes.xml
similarity index 100%
rename from tika-core/src/test/resources/org/apache/tika/pipes/TIKA-4207-limit-bytes.xml
rename to tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/TIKA-4207-limit-bytes.xml
diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/TIKA-4207.xml b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/TIKA-4207.xml
similarity index 100%
rename from tika-core/src/test/resources/org/apache/tika/pipes/TIKA-4207.xml
rename to tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/TIKA-4207.xml
diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3507.xml b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3507.xml
similarity index 100%
rename from tika-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3507.xml
rename to tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3507.xml
diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3865.xml b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3865.xml
similarity index 100%
rename from tika-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3865.xml
rename to tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3865.xml
diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml
similarity index 100%
rename from tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml
rename to tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml
diff --git a/tika-pipes/tika-pipes-iterators/pom.xml b/tika-pipes/tika-pipes-iterators/pom.xml
index 2106a1b045..44f46a27b9 100644
--- a/tika-pipes/tika-pipes-iterators/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/pom.xml
@@ -44,6 +44,20 @@
tika-pipes-iterator-az-blob
+
+
+ org.apache.tika
+ tika-pipes-core
+ ${project.version}
+
+
+ org.pf4j
+ pf4j
+
+ provided
+
+
+
3.0.0-BETA2-rc1
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java
index 0c5d6840dc..f0bb0373fe 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java
@@ -86,7 +86,7 @@ public void setPrefix(String prefix) {
}
@Override
- protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+ public void enqueue() throws InterruptedException, IOException, TimeoutException {
String fetcherName = getFetcherName();
String emitterName = getEmitterName();
long start = System.currentTimeMillis();
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
index e9c0065700..77cd4da3e8 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
@@ -112,7 +112,7 @@ public void setCsvPath(Path csvPath) {
}
@Override
- protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+ public void enqueue() throws InterruptedException, IOException, TimeoutException {
String fetcherName = getFetcherName();
String emitterName = getEmitterName();
try (Reader reader = Files.newBufferedReader(csvPath, charset)) {
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java
index 248d2461e1..4a15a44b25 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java
@@ -91,7 +91,7 @@ public void checkInitialization(InitializableProblemHandler problemHandler) thro
}
@Override
- protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+ public void enqueue() throws InterruptedException, IOException, TimeoutException {
String fetcherName = getFetcherName();
String emitterName = getEmitterName();
long start = System.currentTimeMillis();
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
index 2c178e1475..cdc647b89e 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
@@ -139,7 +139,7 @@ public void setQueryTimeoutSeconds(int seconds) {
}
@Override
- protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+ public void enqueue() throws InterruptedException, IOException, TimeoutException {
String fetcherName = getFetcherName();
String emitterName = getEmitterName();
FetchEmitKeyIndices fetchEmitKeyIndices = null;
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/main/java/org/apache/tika/pipes/pipesiterator/json/JsonPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/main/java/org/apache/tika/pipes/pipesiterator/json/JsonPipesIterator.java
index 6d3ceb6c28..3e190a091c 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/main/java/org/apache/tika/pipes/pipesiterator/json/JsonPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/main/java/org/apache/tika/pipes/pipesiterator/json/JsonPipesIterator.java
@@ -45,7 +45,7 @@ public class JsonPipesIterator extends PipesIterator implements Initializable {
private Path jsonPath;
@Override
- protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+ public void enqueue() throws InterruptedException, IOException, TimeoutException {
try (BufferedReader reader = Files.newBufferedReader(jsonPath, StandardCharsets.UTF_8)) {
String line = reader.readLine();
while (line != null) {
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
index 9fbebcfdaf..4589c97de0 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
@@ -147,7 +147,7 @@ public void checkInitialization(InitializableProblemHandler problemHandler) thro
}
@Override
- protected void enqueue() throws InterruptedException, TimeoutException {
+ public void enqueue() throws InterruptedException, TimeoutException {
String fetcherName = getFetcherName();
String emitterName = getEmitterName();
long start = System.currentTimeMillis();
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
index 38fc1889cf..e11ff614de 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
@@ -181,7 +181,7 @@ public void checkInitialization(InitializableProblemHandler problemHandler) thro
}
@Override
- protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+ public void enqueue() throws InterruptedException, IOException, TimeoutException {
String fetcherName = getFetcherName();
String emitterName = getEmitterName();
long start = System.currentTimeMillis();
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
index 9ecead289b..a02a1d4e0d 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
@@ -170,7 +170,7 @@ public void setProxyPort(int proxyPort) {
}
@Override
- protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+ public void enqueue() throws InterruptedException, IOException, TimeoutException {
String fetcherName = getFetcherName();
String emitterName = getEmitterName();
diff --git a/tika-pipes/tika-pipes-reporters/pom.xml b/tika-pipes/tika-pipes-reporters/pom.xml
index 13ea50a4db..9d0e7d2e94 100644
--- a/tika-pipes/tika-pipes-reporters/pom.xml
+++ b/tika-pipes/tika-pipes-reporters/pom.xml
@@ -37,6 +37,14 @@
tika-pipes-reporter-jdbc
+
+
+ org.apache.tika
+ tika-pipes-core
+ ${project.version}
+
+
+
3.0.0-BETA2-rc1
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
index b48745a6c6..6b456182ce 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
@@ -40,10 +40,10 @@
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.PipesReporter;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.async.AsyncStatus;
import org.apache.tika.pipes.pipesiterator.TotalCountResult;
+import org.apache.tika.pipes.reporter.PipesReporter;
import org.apache.tika.utils.ExceptionUtils;
/**
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java
index 16296fa1cf..42684e5775 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java
@@ -40,11 +40,11 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.apache.tika.pipes.PipesReporter;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.async.AsyncStatus;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
import org.apache.tika.pipes.pipesiterator.TotalCountResult;
+import org.apache.tika.pipes.reporter.PipesReporter;
public class TestFileSystemStatusReporter {
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
index e31c0dc5a2..0c4f671db8 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -43,8 +43,8 @@
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.PipesReporterBase;
import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.pipes.reporter.PipesReporterBase;
import org.apache.tika.utils.StringUtils;
/**
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
index 01d903c5ef..54dab5cfe4 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
@@ -49,12 +49,12 @@
import org.junit.jupiter.api.io.TempDir;
import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.PipesReporter;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.async.AsyncConfig;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.pipesiterator.TotalCountResult;
+import org.apache.tika.pipes.reporter.PipesReporter;
public class TestJDBCPipesReporter {
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
index 7dbe136218..fb2aa279cb 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
@@ -37,8 +37,8 @@
import org.apache.tika.metadata.ExternalProcess;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.PipesReporter;
import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.pipes.reporter.PipesReporter;
import org.apache.tika.utils.StringUtils;
/**
diff --git a/tika-serialization/pom.xml b/tika-pipes/tika-serialization/pom.xml
similarity index 93%
rename from tika-serialization/pom.xml
rename to tika-pipes/tika-serialization/pom.xml
index bfc12cb12f..de6e9f51d2 100644
--- a/tika-serialization/pom.xml
+++ b/tika-pipes/tika-serialization/pom.xml
@@ -24,9 +24,9 @@
org.apache.tika
- tika-parent
+ tika-pipes
3.0.0-SNAPSHOT
- ../tika-parent/pom.xml
+ ../pom.xml
tika-serialization
@@ -47,6 +47,12 @@
${project.version}
provided