Skip to content

Commit

Permalink
Cleanup dependencies (#4)
Browse files Browse the repository at this point in the history
* Switch to upstream disruptor and update to 4.0.0

* Cleanup dependencies

- Use official log4j instead of wso2 fork
- Remove org.eclipse.osgi.services dependency and SiddhiManagerComponent
  class
- Remove unused webjars dependency
- Remove wso2 Maven repositories

* Remove unused OSGi extension support
  • Loading branch information
bernd authored Jan 25, 2024
1 parent b9d5574 commit ac193e8
Show file tree
Hide file tree
Showing 14 changed files with 52 additions and 334 deletions.
12 changes: 2 additions & 10 deletions modules/siddhi-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
<artifactId>siddhi-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.log4j.wso2</groupId>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.orbit.com.lmax</groupId>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
<dependency>
Expand Down Expand Up @@ -77,14 +77,6 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.osgi</groupId>
<artifactId>org.eclipse.osgi.services</artifactId>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
<dependency>
<groupId>org.atteo.classindex</groupId>
<artifactId>classindex</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

/**
* Holder object for context information of {@link org.wso2.siddhi.query.api.SiddhiApp}.
Expand All @@ -50,6 +51,7 @@ public class SiddhiAppContext {
private boolean enforceOrder;
private StatisticsManager statisticsManager = null;

private ThreadFactory executorThreadFactory;
private ExecutorService executorService;
private ScheduledExecutorService scheduledExecutorService;
private List<EternalReferencedHolder> eternalReferencedHolders;
Expand Down Expand Up @@ -139,6 +141,14 @@ public void setThreadBarrier(ThreadBarrier threadBarrier) {
this.threadBarrier = threadBarrier;
}

public ThreadFactory getExecutorThreadFactory() {
return this.executorThreadFactory;
}

public void setExecutorThreadFactory(ThreadFactory threadFactory) {
this.executorThreadFactory = threadFactory;
}

public ExecutorService getExecutorService() {
return executorService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) {

if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
localStreamJunctionMap.putIfAbsent(id, outputStreamJunction);
Expand All @@ -148,7 +148,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) {

if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
streamJunctionMap.putIfAbsent(id, outputStreamJunction);
Expand All @@ -166,7 +166,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) {

if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
streamJunctionMap.putIfAbsent(id, outputStreamJunction);
Expand Down Expand Up @@ -291,7 +291,7 @@ private synchronized void clonePartition(String key) {
StreamJunction streamJunction = localStreamJunctionMap.get(streamId + key);
if (streamJunction == null) {
streamJunction = new StreamJunction(streamDefinition, siddhiAppContext
.getExecutorService(),
.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
localStreamJunctionMap.put(streamId + key, streamJunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public void addStreamJunction(String key, List<QueryRuntime> queryRuntimeList) {
}

private StreamJunction createStreamJunction() {
return new StreamJunction(streamDefinition, siddhiAppContext.getExecutorService(),
return new StreamJunction(streamDefinition, siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;

/**
* Stream Junction is the place where streams are collected and distributed. There will be an Stream Junction per
Expand All @@ -69,7 +69,7 @@ public class StreamJunction implements EventBufferHolder {
private int bufferSize;
private List<Receiver> receivers = new CopyOnWriteArrayList<Receiver>();
private List<Publisher> publishers = Collections.synchronizedList(new LinkedList<>());
private ExecutorService executorService;
private ThreadFactory threadFactory;
private boolean async = false;
private Disruptor<EventExchangeHolder> disruptor;
private RingBuffer<EventExchangeHolder> ringBuffer;
Expand All @@ -80,12 +80,12 @@ public class StreamJunction implements EventBufferHolder {
private OnErrorAction onErrorAction = OnErrorAction.LOG;
private ExceptionListener exceptionListener;

public StreamJunction(StreamDefinition streamDefinition, ExecutorService executorService, int bufferSize,
public StreamJunction(StreamDefinition streamDefinition, ThreadFactory threadFactory, int bufferSize,
StreamJunction faultStreamJunction, SiddhiAppContext siddhiAppContext) {
this.streamDefinition = streamDefinition;
this.bufferSize = bufferSize;
this.batchSize = bufferSize;
this.executorService = executorService;
this.threadFactory = threadFactory;
this.siddhiAppContext = siddhiAppContext;
if (siddhiAppContext.getStatisticsManager() != null) {
this.throughputTracker = QueryParserHelper.createThroughputTracker(siddhiAppContext,
Expand Down Expand Up @@ -285,7 +285,7 @@ public synchronized void startProcessing() {
ProducerType producerType = ProducerType.MULTI;
disruptor = new Disruptor<EventExchangeHolder>(
new EventExchangeHolderFactory(streamDefinition.getAttributeList().size()),
bufferSize, executorService, producerType,
bufferSize, threadFactory, producerType,
new BlockingWaitStrategy());
disruptor.handleExceptionsWith(siddhiAppContext.getDisruptorExceptionHandler());
break;
Expand All @@ -294,7 +294,7 @@ public synchronized void startProcessing() {
if (disruptor == null) {
disruptor = new Disruptor<EventExchangeHolder>(
new EventExchangeHolderFactory(streamDefinition.getAttributeList().size()),
bufferSize, executorService);
bufferSize, threadFactory);
disruptor.handleExceptionsWith(siddhiAppContext.getDisruptorExceptionHandler());
}
if (workers > 0) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public String addQuery(QueryRuntime queryRuntime) {

if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
streamJunctionMap.putIfAbsent(streamDefinition.getId(), outputStreamJunction);
}
Expand All @@ -217,7 +217,7 @@ public String addQuery(QueryRuntime queryRuntime) {

if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
streamJunctionMap.putIfAbsent(streamDefinition.getId(), outputStreamJunction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@

import org.apache.log4j.Logger;
import org.atteo.classindex.ClassIndex;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
import org.osgi.framework.BundleListener;
import org.osgi.framework.wiring.BundleWiring;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.executor.incremental.IncrementalAggregateBaseTimeFunctionExecutor;
import org.wso2.siddhi.core.executor.incremental.IncrementalShouldUpdateFunctionExecutor;
import org.wso2.siddhi.core.executor.incremental.IncrementalStartTimeEndTimeFunctionExecutor;
import org.wso2.siddhi.core.executor.incremental.IncrementalTimeGetTimeZone;
import org.wso2.siddhi.core.executor.incremental.IncrementalUnixTimeFunctionExecutor;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -49,22 +43,6 @@ public class SiddhiExtensionLoader {
*/
public static void loadSiddhiExtensions(Map<String, Class> siddhiExtensionsMap) {
loadLocalExtensions(siddhiExtensionsMap);
BundleContext bundleContext = ReferenceHolder.getInstance().getBundleContext();
if (bundleContext != null) {
loadExtensionOSGI(bundleContext, siddhiExtensionsMap);
}
}

/**
* Load Extensions in OSGi environment.
*
* @param bundleContext OSGi bundleContext
* @param siddhiExtensionsMap reference map for the Siddhi extension
*/
private static void loadExtensionOSGI(BundleContext bundleContext, Map<String, Class> siddhiExtensionsMap) {
ExtensionBundleListener extensionBundleListener = new ExtensionBundleListener(siddhiExtensionsMap);
bundleContext.addBundleListener(extensionBundleListener);
extensionBundleListener.loadAllExtensions(bundleContext);
}

/**
Expand Down Expand Up @@ -151,52 +129,4 @@ private static void addExtensionToMap(String fqExtensionName, Class extensionCla
"loaded with the same namespace and name '" + fqExtensionName + "'");
}
}

/**
* Class to listen to Bundle changes to update available extensions.
*/
private static class ExtensionBundleListener implements BundleListener {

private Map<Class, Integer> bundleExtensions = new HashMap<Class, Integer>();
private Map<String, Class> siddhiExtensionsMap;

ExtensionBundleListener(Map<String, Class> siddhiExtensionsMap) {
this.siddhiExtensionsMap = siddhiExtensionsMap;
}

@Override
public void bundleChanged(BundleEvent bundleEvent) {
if (bundleEvent.getType() == BundleEvent.STARTED) {
addExtensions(bundleEvent.getBundle());
} else {
removeExtensions(bundleEvent.getBundle());
}
}

private void addExtensions(Bundle bundle) {
ClassLoader classLoader = bundle.adapt(BundleWiring.class).getClassLoader();
Iterable<Class<?>> extensions = ClassIndex.getAnnotated(Extension.class, classLoader);
for (Class extension : extensions) {
addExtensionToMap(extension, siddhiExtensionsMap);
bundleExtensions.put(extension, (int) bundle.getBundleId());
}
}

private void removeExtensions(Bundle bundle) {
bundleExtensions.entrySet().stream().filter(entry -> entry.getValue() ==
bundle.getBundleId()).forEachOrdered(entry -> {
siddhiExtensionsMap.remove(entry.getKey());
});
bundleExtensions.entrySet().removeIf(entry -> entry.getValue() ==
bundle.getBundleId());
}

void loadAllExtensions(BundleContext bundleContext) {
for (Bundle b : bundleContext.getBundles()) {
if (b.getState() == Bundle.ACTIVE) {
addExtensions(b);
}
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public static OutputCallback constructOutputCallback(OutputStream outStream, Str
StreamJunction outputStreamJunction = streamJunctionMap.get(id + key);
if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(outputStreamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
streamJunctionMap.putIfAbsent(id + key, outputStreamJunction);
}
Expand Down
Loading

0 comments on commit ac193e8

Please sign in to comment.