Skip to content

Commit

Permalink
Merge pull request #20 from mohanvive/master
Browse files Browse the repository at this point in the history
Update Siddhi dependency to 5.x.x
  • Loading branch information
AnuGayan authored Apr 11, 2019
2 parents f1030aa + 1224532 commit d35d9bb
Show file tree
Hide file tree
Showing 27 changed files with 425 additions and 643 deletions.
10 changes: 5 additions & 5 deletions component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<dependencies>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-core</artifactId>
</dependency>
<dependency>
Expand Down Expand Up @@ -96,8 +96,8 @@
net.minidev.*
</Private-Package>
<Import-Package>
org.wso2.siddhi.core.*;version="${siddhi.version.range}",
org.wso2.siddhi.query.api.*;version="${siddhi.version.range}",
io.siddhi.core.*;version="${siddhi.version.range}",
io.siddhi.query.api.*;version="${siddhi.version.range}",
*;resolution:=optional
</Import-Package>
<DynamicImport-Package>*</DynamicImport-Package>
Expand All @@ -108,7 +108,7 @@
</configuration>
</plugin>
<plugin>
<groupId>org.wso2.siddhi</groupId>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-doc-gen</artifactId>
<version>${siddhi.version}</version>
<executions>
Expand All @@ -128,7 +128,7 @@
<build>
<plugins>
<plugin>
<groupId>org.wso2.siddhi</groupId>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-doc-gen</artifactId>
<version>${siddhi.version}</version>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,31 @@
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ReturnAttribute;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.StreamProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.ReturnAttribute;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.exception.SiddhiAppRuntimeException;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -95,13 +100,14 @@
"attribute named 'jsonElement' into the stream \n`\n e.g.,\n jsonInput - " +
"{name:\"John\",age:25}, \n path - \"$.age\"\n`\n")
)
public class JsonTokenizerAsObjectStreamProcessorFunction extends StreamProcessor {
public class JsonTokenizerAsObjectStreamProcessorFunction extends StreamProcessor<State> {
private static final Logger log = Logger.getLogger(JsonTokenizerAsObjectStreamProcessorFunction.class);
private boolean failOnMissingAttribute = true;

@Override
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor,
StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater,
State state) {
while (streamEventChunk.hasNext()) {
StreamEvent streamEvent = streamEventChunk.next();
Object jsonInput = attributeExpressionExecutors[0].execute(streamEvent);
Expand Down Expand Up @@ -145,14 +151,21 @@ protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processo
* The initialization method for {@link StreamProcessor}, which will be called before other methods and validate
* the all configuration and getting the initial values.
*
* @param attributeExpressionExecutors are the executors of each attributes in the Function
* @param configReader this hold the {@link StreamProcessor} extensions configuration reader.
* @param siddhiAppContext Siddhi app runtime context
* @param metaStreamEvent the stream event meta
* @param abstractDefinition the incoming stream definition
* @param expressionExecutors the executors for the function parameters
* @param configReader this hold the Stream Processor configuration reader.
* @param streamEventClonerHolder streamEventCloner Holder
* @param outputExpectsExpiredEvents whether output can be expired events
* @param findToBeExecuted find will be executed
* @param siddhiQueryContext current siddhi query context
*/
@Override
protected List<Attribute> init(AbstractDefinition inputDefinition,
ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
SiddhiAppContext siddhiAppContext) {
protected StateFactory<State> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition,
ExpressionExecutor[] expressionExecutors, ConfigReader configReader,
StreamEventClonerHolder streamEventClonerHolder,
boolean outputExpectsExpiredEvents, boolean findToBeExecuted,
SiddhiQueryContext siddhiQueryContext) {
if (attributeExpressionExecutors.length == 2 || attributeExpressionExecutors.length == 3) {
if (attributeExpressionExecutors[0] == null) {
throw new SiddhiAppValidationException("Invalid input given to first argument 'json' of " +
Expand Down Expand Up @@ -188,9 +201,8 @@ protected List<Attribute> init(AbstractDefinition inputDefinition,
throw new SiddhiAppValidationException("Invalid no of arguments passed to json:tokenizeAsObject() function,"
+ "required 2, but found " + attributeExpressionExecutors.length);
}
List<Attribute> attributes = new ArrayList<>();
attributes.add(new Attribute("jsonElement", Attribute.Type.OBJECT));
return attributes;

return null;
}

/**
Expand All @@ -212,30 +224,21 @@ public void start() {
public void stop() {
}

/**
* Used to collect the serializable state of the processing element, that need to be
* persisted for reconstructing the element to the same state on a different point of time
*
* @return stateful objects of the processing element as an map
*/
@Override
public Map<String, Object> currentState() {
return null;

private void sendEvents(StreamEvent streamEvent, Object[] data, ComplexEventChunk<StreamEvent> streamEventChunk) {
complexEventPopulater.populateComplexEvent(streamEvent, data);
nextProcessor.process(streamEventChunk);
}

/**
* Used to restore serialized state of the processing element, for reconstructing
* the element to the same state as if was on a previous point of time.
*
* @param state the stateful objects of the processing element as a map.
* This is the same map that is created upon calling currentState() method.
*/
@Override
public void restoreState(Map<String, Object> state) {
public List<Attribute> getReturnAttributes() {
List<Attribute> attributes = new ArrayList<>();
attributes.add(new Attribute("jsonElement", Attribute.Type.OBJECT));
return attributes;
}

private void sendEvents(StreamEvent streamEvent, Object[] data, ComplexEventChunk<StreamEvent> streamEventChunk) {
complexEventPopulater.populateComplexEvent(streamEvent, data);
nextProcessor.process(streamEventChunk);
@Override
public ProcessingMode getProcessingMode() {
return ProcessingMode.BATCH;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,30 @@
import com.google.gson.GsonBuilder;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ReturnAttribute;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.StreamProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.ReturnAttribute;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -94,14 +99,15 @@
"named 'jsonElement' into the stream. \n`\n e.g.,\n jsonInput - {name:\"John\",age:25}, \n " +
"path - \"$.age\"\n`\n")
)
public class JsonTokenizerStreamProcessorFunction extends StreamProcessor {
public class JsonTokenizerStreamProcessorFunction extends StreamProcessor<State> {
private static final Logger log = Logger.getLogger(JsonTokenizerStreamProcessorFunction.class);
private static final Gson gson = new GsonBuilder().serializeNulls().create();
private boolean failOnMissingAttribute = true;

@Override
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor,
StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater,
State state) {
while (streamEventChunk.hasNext()) {
StreamEvent streamEvent = streamEventChunk.next();
Object jsonInput = attributeExpressionExecutors[0].execute(streamEvent);
Expand Down Expand Up @@ -143,14 +149,21 @@ protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processo
* The initialization method for {@link StreamProcessor}, which will be called before other methods and validate
* the all configuration and getting the initial values.
*
* @param attributeExpressionExecutors are the executors of each attributes in the Function
* @param configReader this hold the {@link StreamProcessor} extensions configuration reader.
* @param siddhiAppContext Siddhi app runtime context
* @param metaStreamEvent the stream event meta
* @param abstractDefinition the incoming stream definition
* @param expressionExecutors the executors for the function parameters
* @param configReader this hold the Stream Processor configuration reader.
* @param streamEventClonerHolder streamEventCloner Holder
* @param outputExpectsExpiredEvents whether output can be expired events
* @param findToBeExecuted find will be executed
* @param siddhiQueryContext current siddhi query context
*/
@Override
protected List<Attribute> init(AbstractDefinition inputDefinition,
ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
SiddhiAppContext siddhiAppContext) {
protected StateFactory<State> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition,
ExpressionExecutor[] expressionExecutors, ConfigReader configReader,
StreamEventClonerHolder streamEventClonerHolder,
boolean outputExpectsExpiredEvents, boolean findToBeExecuted,
SiddhiQueryContext siddhiQueryContext) {
if (attributeExpressionExecutors.length == 2 || attributeExpressionExecutors.length == 3) {
if (attributeExpressionExecutors[0] == null) {
throw new SiddhiAppValidationException("Invalid input given to first argument 'json' of json:tokenize" +
Expand Down Expand Up @@ -188,9 +201,8 @@ protected List<Attribute> init(AbstractDefinition inputDefinition,
throw new SiddhiAppValidationException("Invalid no of arguments passed to json:tokenize() function, "
+ "required 2, but found " + attributeExpressionExecutors.length);
}
List<Attribute> attributes = new ArrayList<>();
attributes.add(new Attribute("jsonElement", Attribute.Type.STRING));
return attributes;

return null;
}

/**
Expand All @@ -214,31 +226,20 @@ public void stop() {

}

/**
* Used to collect the serializable state of the processing element, that need to be
* persisted for reconstructing the element to the same state on a different point of time
*
* @return stateful objects of the processing element as an map
*/
@Override
public Map<String, Object> currentState() {
return null;
}

/**
* Used to restore serialized state of the processing element, for reconstructing
* the element to the same state as if was on a previous point of time.
*
* @param state the stateful objects of the processing element as a map.
* This is the same map that is created upon calling currentState() method.
*/
@Override
public void restoreState(Map<String, Object> state) {

public List<Attribute> getReturnAttributes() {
List<Attribute> attributes = new ArrayList<>();
attributes.add(new Attribute("jsonElement", Attribute.Type.STRING));
return attributes;
}

private void sendEvents(StreamEvent streamEvent, Object[] data, ComplexEventChunk<StreamEvent> streamEventChunk) {
complexEventPopulater.populateComplexEvent(streamEvent, data);
nextProcessor.process(streamEventChunk);
}

@Override
public ProcessingMode getProcessingMode() {
return ProcessingMode.BATCH;
}
}
Loading

0 comments on commit d35d9bb

Please sign in to comment.