From ac193e8c53c33ec11765fbf914d845f22f8d52c0 Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Thu, 25 Jan 2024 12:05:55 +0100 Subject: [PATCH] Cleanup dependencies (#4) * Switch to upstream disruptor and update to 4.0.0 * Cleanup dependencies - Use official log4j instead of wso2 fork - Remove org.eclipse.osgi.services dependency and SiddhiManagerComponent class - Remove unused webjars dependency - Remove wso2 Maven repositories * Remove unused OSGi extension support --- modules/siddhi-core/pom.xml | 12 +- .../siddhi/core/config/SiddhiAppContext.java | 10 ++ .../core/partition/PartitionRuntime.java | 8 +- .../partition/PartitionStreamReceiver.java | 2 +- .../siddhi/core/stream/StreamJunction.java | 12 +- .../siddhi/core/util/ReferenceHolder.java | 44 ------- .../core/util/SiddhiAppRuntimeBuilder.java | 4 +- .../core/util/SiddhiExtensionLoader.java | 70 ---------- .../core/util/SiddhiManagerComponent.java | 61 --------- .../siddhi/core/util/parser/OutputParser.java | 2 +- .../core/util/parser/SiddhiAppParser.java | 10 +- .../parser/helper/DefinitionParserHelper.java | 2 +- .../siddhi/core/stream/JunctionTestCase.java | 28 ++-- pom.xml | 121 +----------------- 14 files changed, 52 insertions(+), 334 deletions(-) delete mode 100644 modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/ReferenceHolder.java delete mode 100644 modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiManagerComponent.java diff --git a/modules/siddhi-core/pom.xml b/modules/siddhi-core/pom.xml index b467fdf915..1ec315ca1e 100644 --- a/modules/siddhi-core/pom.xml +++ b/modules/siddhi-core/pom.xml @@ -44,11 +44,11 @@ siddhi-annotations - org.apache.log4j.wso2 + log4j log4j - org.wso2.orbit.com.lmax + com.lmax disruptor @@ -77,14 +77,6 @@ com.google.code.gson gson - - org.eclipse.osgi - org.eclipse.osgi.services - - - org.osgi - org.osgi.core - org.atteo.classindex classindex diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java index f1976f4952..4a38270195 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; /** * Holder object for context information of {@link org.wso2.siddhi.query.api.SiddhiApp}. @@ -50,6 +51,7 @@ public class SiddhiAppContext { private boolean enforceOrder; private StatisticsManager statisticsManager = null; + private ThreadFactory executorThreadFactory; private ExecutorService executorService; private ScheduledExecutorService scheduledExecutorService; private List eternalReferencedHolders; @@ -139,6 +141,14 @@ public void setThreadBarrier(ThreadBarrier threadBarrier) { this.threadBarrier = threadBarrier; } + public ThreadFactory getExecutorThreadFactory() { + return this.executorThreadFactory; + } + + public void setExecutorThreadFactory(ThreadFactory threadFactory) { + this.executorThreadFactory = threadFactory; + } + public ExecutorService getExecutorService() { return executorService; } diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionRuntime.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionRuntime.java index 75515d5ddc..447ca48aea 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionRuntime.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionRuntime.java @@ -135,7 +135,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) { if (outputStreamJunction == null) { outputStreamJunction = new StreamJunction(streamDefinition, - siddhiAppContext.getExecutorService(), + siddhiAppContext.getExecutorThreadFactory(), siddhiAppContext.getBufferSize(), null, siddhiAppContext); localStreamJunctionMap.putIfAbsent(id, outputStreamJunction); @@ -148,7 +148,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) { if (outputStreamJunction == null) { outputStreamJunction = new StreamJunction(streamDefinition, - siddhiAppContext.getExecutorService(), + siddhiAppContext.getExecutorThreadFactory(), siddhiAppContext.getBufferSize(), null, siddhiAppContext); streamJunctionMap.putIfAbsent(id, outputStreamJunction); @@ -166,7 +166,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) { if (outputStreamJunction == null) { outputStreamJunction = new StreamJunction(streamDefinition, - siddhiAppContext.getExecutorService(), + siddhiAppContext.getExecutorThreadFactory(), siddhiAppContext.getBufferSize(), null, siddhiAppContext); streamJunctionMap.putIfAbsent(id, outputStreamJunction); @@ -291,7 +291,7 @@ private synchronized void clonePartition(String key) { StreamJunction streamJunction = localStreamJunctionMap.get(streamId + key); if (streamJunction == null) { streamJunction = new StreamJunction(streamDefinition, siddhiAppContext - .getExecutorService(), + .getExecutorThreadFactory(), siddhiAppContext.getBufferSize(), null, siddhiAppContext); localStreamJunctionMap.put(streamId + key, streamJunction); diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionStreamReceiver.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionStreamReceiver.java index 586aff5f31..6ea43065ab 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionStreamReceiver.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionStreamReceiver.java @@ -310,7 +310,7 @@ public void addStreamJunction(String key, List queryRuntimeList) { } private StreamJunction createStreamJunction() { - return new StreamJunction(streamDefinition, siddhiAppContext.getExecutorService(), + return new StreamJunction(streamDefinition, siddhiAppContext.getExecutorThreadFactory(), siddhiAppContext.getBufferSize(), null, siddhiAppContext); } diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/stream/StreamJunction.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/stream/StreamJunction.java index 494a19a163..693f720835 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/stream/StreamJunction.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/stream/StreamJunction.java @@ -52,7 +52,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; /** * Stream Junction is the place where streams are collected and distributed. There will be an Stream Junction per @@ -69,7 +69,7 @@ public class StreamJunction implements EventBufferHolder { private int bufferSize; private List receivers = new CopyOnWriteArrayList(); private List publishers = Collections.synchronizedList(new LinkedList<>()); - private ExecutorService executorService; + private ThreadFactory threadFactory; private boolean async = false; private Disruptor disruptor; private RingBuffer ringBuffer; @@ -80,12 +80,12 @@ public class StreamJunction implements EventBufferHolder { private OnErrorAction onErrorAction = OnErrorAction.LOG; private ExceptionListener exceptionListener; - public StreamJunction(StreamDefinition streamDefinition, ExecutorService executorService, int bufferSize, + public StreamJunction(StreamDefinition streamDefinition, ThreadFactory threadFactory, int bufferSize, StreamJunction faultStreamJunction, SiddhiAppContext siddhiAppContext) { this.streamDefinition = streamDefinition; this.bufferSize = bufferSize; this.batchSize = bufferSize; - this.executorService = executorService; + this.threadFactory = threadFactory; this.siddhiAppContext = siddhiAppContext; if (siddhiAppContext.getStatisticsManager() != null) { this.throughputTracker = QueryParserHelper.createThroughputTracker(siddhiAppContext, @@ -285,7 +285,7 @@ public synchronized void startProcessing() { ProducerType producerType = ProducerType.MULTI; disruptor = new Disruptor( new EventExchangeHolderFactory(streamDefinition.getAttributeList().size()), - bufferSize, executorService, producerType, + bufferSize, threadFactory, producerType, new BlockingWaitStrategy()); disruptor.handleExceptionsWith(siddhiAppContext.getDisruptorExceptionHandler()); break; @@ -294,7 +294,7 @@ public synchronized void startProcessing() { if (disruptor == null) { disruptor = new Disruptor( new EventExchangeHolderFactory(streamDefinition.getAttributeList().size()), - bufferSize, executorService); + bufferSize, threadFactory); disruptor.handleExceptionsWith(siddhiAppContext.getDisruptorExceptionHandler()); } if (workers > 0) { diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/ReferenceHolder.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/ReferenceHolder.java deleted file mode 100644 index 0b62dbd031..0000000000 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/ReferenceHolder.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.wso2.siddhi.core.util; - -import org.osgi.framework.BundleContext; - -/** - * Class to hold references need by siddhi bundle. - */ -public class ReferenceHolder { - private BundleContext bundleContext; //bundle context for siddhi bundle. - private static final ReferenceHolder instance = new ReferenceHolder(); - - private ReferenceHolder() { - //empty constructor to facilitate singleton. - } - - public static ReferenceHolder getInstance() { - return instance; - } - - public BundleContext getBundleContext() { - return bundleContext; - } - - public void setBundleContext(BundleContext bundleContext) { - this.bundleContext = bundleContext; - } -} diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiAppRuntimeBuilder.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiAppRuntimeBuilder.java index 9fc47d4de3..9f5e2dc152 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiAppRuntimeBuilder.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiAppRuntimeBuilder.java @@ -201,7 +201,7 @@ public String addQuery(QueryRuntime queryRuntime) { if (outputStreamJunction == null) { outputStreamJunction = new StreamJunction(streamDefinition, - siddhiAppContext.getExecutorService(), + siddhiAppContext.getExecutorThreadFactory(), siddhiAppContext.getBufferSize(), null, siddhiAppContext); streamJunctionMap.putIfAbsent(streamDefinition.getId(), outputStreamJunction); } @@ -217,7 +217,7 @@ public String addQuery(QueryRuntime queryRuntime) { if (outputStreamJunction == null) { outputStreamJunction = new StreamJunction(streamDefinition, - siddhiAppContext.getExecutorService(), + siddhiAppContext.getExecutorThreadFactory(), siddhiAppContext.getBufferSize(), null, siddhiAppContext); streamJunctionMap.putIfAbsent(streamDefinition.getId(), outputStreamJunction); } diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiExtensionLoader.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiExtensionLoader.java index 1c02262243..208315fd3f 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiExtensionLoader.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiExtensionLoader.java @@ -20,11 +20,6 @@ import org.apache.log4j.Logger; import org.atteo.classindex.ClassIndex; -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleContext; -import org.osgi.framework.BundleEvent; -import org.osgi.framework.BundleListener; -import org.osgi.framework.wiring.BundleWiring; import org.wso2.siddhi.annotation.Extension; import org.wso2.siddhi.core.executor.incremental.IncrementalAggregateBaseTimeFunctionExecutor; import org.wso2.siddhi.core.executor.incremental.IncrementalShouldUpdateFunctionExecutor; @@ -32,7 +27,6 @@ import org.wso2.siddhi.core.executor.incremental.IncrementalTimeGetTimeZone; import org.wso2.siddhi.core.executor.incremental.IncrementalUnixTimeFunctionExecutor; -import java.util.HashMap; import java.util.Map; /** @@ -49,22 +43,6 @@ public class SiddhiExtensionLoader { */ public static void loadSiddhiExtensions(Map siddhiExtensionsMap) { loadLocalExtensions(siddhiExtensionsMap); - BundleContext bundleContext = ReferenceHolder.getInstance().getBundleContext(); - if (bundleContext != null) { - loadExtensionOSGI(bundleContext, siddhiExtensionsMap); - } - } - - /** - * Load Extensions in OSGi environment. - * - * @param bundleContext OSGi bundleContext - * @param siddhiExtensionsMap reference map for the Siddhi extension - */ - private static void loadExtensionOSGI(BundleContext bundleContext, Map siddhiExtensionsMap) { - ExtensionBundleListener extensionBundleListener = new ExtensionBundleListener(siddhiExtensionsMap); - bundleContext.addBundleListener(extensionBundleListener); - extensionBundleListener.loadAllExtensions(bundleContext); } /** @@ -151,52 +129,4 @@ private static void addExtensionToMap(String fqExtensionName, Class extensionCla "loaded with the same namespace and name '" + fqExtensionName + "'"); } } - - /** - * Class to listen to Bundle changes to update available extensions. - */ - private static class ExtensionBundleListener implements BundleListener { - - private Map bundleExtensions = new HashMap(); - private Map siddhiExtensionsMap; - - ExtensionBundleListener(Map siddhiExtensionsMap) { - this.siddhiExtensionsMap = siddhiExtensionsMap; - } - - @Override - public void bundleChanged(BundleEvent bundleEvent) { - if (bundleEvent.getType() == BundleEvent.STARTED) { - addExtensions(bundleEvent.getBundle()); - } else { - removeExtensions(bundleEvent.getBundle()); - } - } - - private void addExtensions(Bundle bundle) { - ClassLoader classLoader = bundle.adapt(BundleWiring.class).getClassLoader(); - Iterable> extensions = ClassIndex.getAnnotated(Extension.class, classLoader); - for (Class extension : extensions) { - addExtensionToMap(extension, siddhiExtensionsMap); - bundleExtensions.put(extension, (int) bundle.getBundleId()); - } - } - - private void removeExtensions(Bundle bundle) { - bundleExtensions.entrySet().stream().filter(entry -> entry.getValue() == - bundle.getBundleId()).forEachOrdered(entry -> { - siddhiExtensionsMap.remove(entry.getKey()); - }); - bundleExtensions.entrySet().removeIf(entry -> entry.getValue() == - bundle.getBundleId()); - } - - void loadAllExtensions(BundleContext bundleContext) { - for (Bundle b : bundleContext.getBundles()) { - if (b.getState() == Bundle.ACTIVE) { - addExtensions(b); - } - } - } - } } diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiManagerComponent.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiManagerComponent.java deleted file mode 100644 index dee5756456..0000000000 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiManagerComponent.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.wso2.siddhi.core.util; - -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceRegistration; -import org.osgi.service.component.annotations.Activate; -import org.osgi.service.component.annotations.Component; - -import java.util.HashMap; - -/** - * Siddhi Manager Service which is - * - * @since 4.0.0-M3-SNAPSHOT - */ -@Component( - immediate = true -) -public class SiddhiManagerComponent { - private ServiceRegistration serviceRegistration; - - /** - * This is the activation method of SiddhiManagerService. This will be initilize the Siddhi Manager and register the - * ManagerService. - * - * @param bundleContext the bundle context instance of this bundle. - * @throws Exception this will be thrown if an issue occurs while executing the activate method - */ - @Activate - protected void start(BundleContext bundleContext) throws Exception { - ReferenceHolder.getInstance().setBundleContext(bundleContext); - SiddhiExtensionLoader.loadSiddhiExtensions(new HashMap<>()); - serviceRegistration = bundleContext.registerService(SiddhiComponentActivator.class.getName(), - new SiddhiComponentActivator(), null); - } - - protected void stop() throws Exception { - ReferenceHolder.getInstance().setBundleContext(null); - serviceRegistration.unregister(); - } -} - - - diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OutputParser.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OutputParser.java index b581a73382..efaaa2b678 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OutputParser.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OutputParser.java @@ -250,7 +250,7 @@ public static OutputCallback constructOutputCallback(OutputStream outStream, Str StreamJunction outputStreamJunction = streamJunctionMap.get(id + key); if (outputStreamJunction == null) { outputStreamJunction = new StreamJunction(outputStreamDefinition, - siddhiAppContext.getExecutorService(), + siddhiAppContext.getExecutorThreadFactory(), siddhiAppContext.getBufferSize(), null, siddhiAppContext); streamJunctionMap.putIfAbsent(id + key, outputStreamJunction); } diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java index 47ea4f0192..86031254b5 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java @@ -59,6 +59,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import static org.wso2.siddhi.core.util.parser.helper.AnnotationHelper.generateIncludedMetrics; @@ -156,9 +157,12 @@ public static SiddhiAppRuntimeBuilder parse(SiddhiApp siddhiApp, String siddhiAp siddhiAppContext.setThreadBarrier(new ThreadBarrier()); - siddhiAppContext.setExecutorService(Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("Siddhi-" + siddhiAppContext.getName() + - "-executor-thread-%d").build())); + final ThreadFactory executorThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("Siddhi-" + siddhiAppContext.getName() + "-executor-thread-%d") + .build(); + + siddhiAppContext.setExecutorThreadFactory(executorThreadFactory); + siddhiAppContext.setExecutorService(Executors.newCachedThreadPool(executorThreadFactory)); siddhiAppContext.setScheduledExecutorService(Executors.newScheduledThreadPool(5, new ThreadFactoryBuilder().setNameFormat("Siddhi-" + diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/helper/DefinitionParserHelper.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/helper/DefinitionParserHelper.java index 3b9caef55d..8a85c6d8bc 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/helper/DefinitionParserHelper.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/helper/DefinitionParserHelper.java @@ -140,7 +140,7 @@ public static void addStreamJunction(StreamDefinition streamDefinition, StreamJunction faultStreamJunction = streamJunctionMap.get(SiddhiConstants.FAULT_STREAM_PREFIX. concat(streamDefinition.getId())); StreamJunction streamJunction = new StreamJunction(streamDefinition, - siddhiAppContext.getExecutorService(), + siddhiAppContext.getExecutorThreadFactory(), siddhiAppContext.getBufferSize(), faultStreamJunction, siddhiAppContext); streamJunctionMap.putIfAbsent(streamDefinition.getId(), streamJunction); } diff --git a/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/stream/JunctionTestCase.java b/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/stream/JunctionTestCase.java index d880f4cca4..77237dfd2d 100644 --- a/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/stream/JunctionTestCase.java +++ b/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/stream/JunctionTestCase.java @@ -31,21 +31,21 @@ import org.wso2.siddhi.query.api.definition.Attribute; import org.wso2.siddhi.query.api.definition.StreamDefinition; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; public class JunctionTestCase { private static final Logger log = Logger.getLogger(JunctionTestCase.class); private int count; private boolean eventArrived; - private ExecutorService executorService; + private ThreadFactory threadFactory; private SiddhiAppContext siddhiAppContext; @BeforeMethod public void init() { count = 0; eventArrived = false; - executorService = Executors.newCachedThreadPool(); + threadFactory = Executors.defaultThreadFactory(); SiddhiContext siddhiContext = new SiddhiContext(); siddhiAppContext = new SiddhiAppContext(); siddhiAppContext.setSiddhiContext(siddhiContext); @@ -59,7 +59,7 @@ public void junctionToReceiverTest() throws InterruptedException { StreamDefinition streamA = StreamDefinition.id("streamA").attribute("symbol", Attribute.Type.STRING) .attribute("price", Attribute.Type.INT). annotation(Annotation.annotation("parallel")); - StreamJunction streamJunctionA = new StreamJunction(streamA, executorService, 1024, + StreamJunction streamJunctionA = new StreamJunction(streamA, threadFactory, 1024, null, siddhiAppContext); StreamJunction.Publisher streamPublisherA = streamJunctionA.constructPublisher(); @@ -91,7 +91,7 @@ public void oneToOneTest() throws InterruptedException { .attribute("price", Attribute.Type.INT). annotation(Annotation.annotation("parallel")); - StreamJunction streamJunctionA = new StreamJunction(streamA, executorService, 1024, + StreamJunction streamJunctionA = new StreamJunction(streamA, threadFactory, 1024, null, siddhiAppContext); StreamJunction.Publisher streamPublisherA = streamJunctionA.constructPublisher(); @@ -99,7 +99,7 @@ public void oneToOneTest() throws InterruptedException { .attribute("price", Attribute.Type.INT). annotation(Annotation.annotation("parallel")); - StreamJunction streamJunctionB = new StreamJunction(streamB, executorService, 1024, + StreamJunction streamJunctionB = new StreamJunction(streamB, threadFactory, 1024, null, siddhiAppContext); final StreamJunction.Publisher streamPublisherB = streamJunctionB.constructPublisher(); @@ -162,14 +162,14 @@ public void multiThreadedTest1() throws InterruptedException { StreamDefinition streamA = StreamDefinition.id("streamA").attribute("symbol", Attribute.Type.STRING) .attribute("price", Attribute.Type.INT). annotation(Annotation.annotation("async")); - StreamJunction streamJunctionA = new StreamJunction(streamA, executorService, 1024, + StreamJunction streamJunctionA = new StreamJunction(streamA, threadFactory, 1024, null, siddhiAppContext); StreamJunction.Publisher streamPublisherA = streamJunctionA.constructPublisher(); StreamDefinition streamB = StreamDefinition.id("streamB").attribute("symbol", Attribute.Type.STRING) .attribute("price", Attribute.Type.INT). annotation(Annotation.annotation("async")); - StreamJunction streamJunctionB = new StreamJunction(streamB, executorService, 1024, + StreamJunction streamJunctionB = new StreamJunction(streamB, threadFactory, 1024, null, siddhiAppContext); final StreamJunction.Publisher streamPublisherB1 = streamJunctionB.constructPublisher(); final StreamJunction.Publisher streamPublisherB2 = streamJunctionB.constructPublisher(); @@ -259,14 +259,14 @@ public void multiThreadedTest2() throws InterruptedException { StreamDefinition streamA = StreamDefinition.id("streamA").attribute("symbol", Attribute.Type.STRING) .attribute("price", Attribute.Type.INT). annotation(Annotation.annotation("async")); - StreamJunction streamJunctionA = new StreamJunction(streamA, executorService, 1024, + StreamJunction streamJunctionA = new StreamJunction(streamA, threadFactory, 1024, null, siddhiAppContext); StreamJunction.Publisher streamPublisherA = streamJunctionA.constructPublisher(); StreamDefinition streamB = StreamDefinition.id("streamB").attribute("symbol", Attribute.Type.STRING) .attribute("price", Attribute.Type.INT). annotation(Annotation.annotation("async")); - StreamJunction streamJunctionB = new StreamJunction(streamB, executorService, 1024, + StreamJunction streamJunctionB = new StreamJunction(streamB, threadFactory, 1024, null, siddhiAppContext); final StreamJunction.Publisher streamPublisherB1 = streamJunctionB.constructPublisher(); final StreamJunction.Publisher streamPublisherB2 = streamJunctionB.constructPublisher(); @@ -275,7 +275,7 @@ public void multiThreadedTest2() throws InterruptedException { StreamDefinition streamC = StreamDefinition.id("streamC").attribute("symbol", Attribute.Type.STRING) .attribute("price", Attribute.Type.INT). annotation(Annotation.annotation("async")); - StreamJunction streamJunctionC = new StreamJunction(streamC, executorService, 1024, + StreamJunction streamJunctionC = new StreamJunction(streamC, threadFactory, 1024, null, siddhiAppContext); final StreamJunction.Publisher streamPublisherC1 = streamJunctionC.constructPublisher(); final StreamJunction.Publisher streamPublisherC2 = streamJunctionC.constructPublisher(); @@ -440,14 +440,14 @@ public void multiThreadedWithEventPoolTest() throws InterruptedException { StreamDefinition streamA = StreamDefinition.id("streamA").attribute("symbol", Attribute.Type.STRING) .attribute("price", Attribute.Type.INT). annotation(Annotation.annotation("async")); - StreamJunction streamJunctionA = new StreamJunction(streamA, executorService, 1024, + StreamJunction streamJunctionA = new StreamJunction(streamA, threadFactory, 1024, null, siddhiAppContext); StreamJunction.Publisher streamPublisherA = streamJunctionA.constructPublisher(); StreamDefinition streamB = StreamDefinition.id("streamB").attribute("symbol", Attribute.Type.STRING) .attribute("price", Attribute.Type.INT). annotation(Annotation.annotation("async")); - StreamJunction streamJunctionB = new StreamJunction(streamB, executorService, 1024, + StreamJunction streamJunctionB = new StreamJunction(streamB, threadFactory, 1024, null, siddhiAppContext); final StreamJunction.Publisher streamPublisherB1 = streamJunctionB.constructPublisher(); final StreamJunction.Publisher streamPublisherB2 = streamJunctionB.constructPublisher(); @@ -456,7 +456,7 @@ public void multiThreadedWithEventPoolTest() throws InterruptedException { StreamDefinition streamC = StreamDefinition.id("streamC").attribute("symbol", Attribute.Type.STRING) .attribute("price", Attribute.Type.INT). annotation(Annotation.annotation("async")); - StreamJunction streamJunctionC = new StreamJunction(streamC, executorService, 1024, + StreamJunction streamJunctionC = new StreamJunction(streamC, threadFactory, 1024, null, siddhiAppContext); final StreamJunction.Publisher streamPublisherC1 = streamJunctionC.constructPublisher(); final StreamJunction.Publisher streamPublisherC2 = streamJunctionC.constructPublisher(); diff --git a/pom.xml b/pom.xml index 3e8cfe9ace..c2501d306f 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ - org.apache.log4j.wso2 + log4j log4j ${log4j.version} @@ -81,12 +81,6 @@ ${testng.version} test - - org.webjars - jquery - 3.7.1 - test - junit junit @@ -99,7 +93,7 @@ ${antlr.runtime.version} - org.wso2.orbit.com.lmax + com.lmax disruptor ${disruptor.version} @@ -131,18 +125,6 @@ ${metrics.version} - - - org.osgi - org.osgi.core - ${org.osgi.core.version} - - - org.eclipse.osgi - org.eclipse.osgi.services - ${version.equinox.osgi.services} - - org.graylog.repackaged.siddhi @@ -378,14 +360,12 @@ 7.8.0 4.13.2 - 1.2.17.wso2v1 + 1.2.17 4.13.1 - 3.4.2.wso2v1 + 4.0.0 32.1.3-jre 2.10.1 3.13 - 6.0.0 - 3.3.100.v20130513-1956 4.2.22 @@ -396,60 +376,6 @@ v4.5.11 - - - wso2-nexus - WSO2 internal Repository - https://maven.wso2.org/nexus/content/groups/wso2-public/ - - true - daily - ignore - - - - - wso2.releases - WSO2 internal Repository - https://maven.wso2.org/nexus/content/repositories/releases/ - - true - daily - ignore - - - - - wso2.snapshots - Apache Snapshot Repository - https://maven.wso2.org/nexus/content/repositories/snapshots/ - - true - daily - - - false - - - - - central - Maven Repository Switchboard - default - https://repo1.maven.org/maven2 - - false - - - - - - sonatype.releases - https://oss.sonatype.org/content/repositories/releases/ - - - - sonatype-nexus-snapshots @@ -463,43 +389,4 @@ - - - - wso2.releases - WSO2 internal Repository - https://maven.wso2.org/nexus/content/repositories/releases/ - - true - daily - ignore - - - - - wso2.snapshots - Apache Snapshot Repository - https://maven.wso2.org/nexus/content/repositories/snapshots/ - - true - daily - - - false - - - - - wso2-nexus - WSO2 internal Repository - https://maven.wso2.org/nexus/content/groups/wso2-public/ - - true - daily - ignore - - - - -