Skip to content

Commit

Permalink
Merge branch 'development'
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/scala/io/rml/framework/core/function/model/Function.scala
#	src/main/scala/io/rml/framework/core/function/model/std/StdRandomFunction.scala
#	src/main/scala/io/rml/framework/core/function/model/std/StdUpperCaseFunction.scala
#	src/main/scala/io/rml/framework/core/item/xml/XMLItem.scala
#	src/main/scala/io/rml/framework/core/util/ParameterUtil.scala
#	src/main/scala/io/rml/framework/engine/statement/GraphGeneratorAssembler.scala
#	src/main/scala/io/rml/framework/engine/statement/PredicateGeneratorAssembler.scala
#	src/main/scala/io/rml/framework/flink/function/FnOEnvironmentLoader.scala
#	src/main/scala/io/rml/framework/flink/source/CSVStream.scala
#	src/test/scala/io/rml/framework/TCPStreamTestSyncFnO.scala
  • Loading branch information
ghsnd committed May 19, 2021
2 parents b8bf2dc + c2bc9c1 commit d97e560
Show file tree
Hide file tree
Showing 163 changed files with 3,039 additions and 922 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,27 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
* Support for stream partitioning in windows
* Joins of data streams

## [2.1.1] - 2020-05-19

### Added
* Support for using Web of Things descriptions in logical soure and logical target, as described in [Van Assche et al](https://link.springer.com/chapter/10.1007/978-3-030-74296-6_26)
and [Target in RML specification](https://rml.io/specs/rml-target).
The current imlementation is a proof-of-concept. As WoT data source RMLStreamer supports MQTT streams;
as logical target a file dump is supported.

### Changed
* Updated JsonSurfer from version 1.5.1 to 1.6.0
* Updated Flink from version 1.11.3 to 1.12.3

### Fixed
* Function loading didn't always work because the `toString` method was called on the `Uri` to a function instead of
the `value` method. (Internal [issue #132](https://gitlab.ilabt.imec.be/rml/proc/rml-streamer/-/issues/132)).

## [2.1.0] - 2020-03-18

### Added
* Support for functions on a per-record basis using the [Function Ontology](https://fno.io/).
* Web of Things source (MQTT)

### Changed
* Updated Flink from version 1.10.0 to 1.11.3
Expand Down Expand Up @@ -124,3 +141,4 @@ can be set with the program argument `--baseIRI`.
[1.2.3]: https://github.com/RMLio/RMLStreamer/compare/v1.2.2...v1.2.3
[2.0.0]: https://github.com/RMLio/RMLStreamer/compare/v1.2.3...v2.0.0
[2.1.0]: https://github.com/RMLio/RMLStreamer/compare/v2.0.0...v2.1.0
[2.1.1]: https://github.com/RMLio/RMLStreamer/compare/v2.1.0...v2.1.1
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ If you want to deploy it yourself, read on.

### Installing Flink
RMLStreamer runs its jobs on Flink clusters.
More information on how to install Flink and getting started can be found [here](https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html).
More information on how to install Flink and getting started can be found [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html).
At least a local cluster must be running in order to start executing RML Mappings with RMLStreamer.
Please note that this version works with Flink 1.11.3 with Scala 2.11 support, which can be downloaded [here](https://archive.apache.org/dist/flink/flink-1.11.3/flink-1.11.3-bin-scala_2.11.tgz).
Please note that this version works with Flink 1.12.3 with Scala 2.11 support, which can be downloaded [here](https://archive.apache.org/dist/flink/flink-1.12.3/flink-1.12.3-bin-scala_2.11.tgz).

### Building RMLStreamer

Expand Down Expand Up @@ -46,7 +46,7 @@ The resulting `RMLStreamer-<version>.jar`, found in the `target` folder, can be
### Executing RML Mappings

Here we give examples for running RMLStreamer from the command line. We use `FLINK_BIN` to denote the Flink CLI tool,
usually found in the `bin` directory of the Flink installation. E.g. `/home/myuser/flink-1.11.3/bin/flink`.
usually found in the `bin` directory of the Flink installation. E.g. `/home/myuser/flink-1.12.3/bin/flink`.
For Windows a `flink.bat` script is provided.

The general usage is:
Expand Down
4 changes: 2 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'
services:

jobmanager:
image: flink:1.11.3-scala_2.11
image: flink:1.12.3-scala_2.11
expose:
- "6123"
ports:
Expand All @@ -14,7 +14,7 @@ services:
- data:/mnt/data

taskmanager:
image: flink:1.11.3-scala_2.11
image: flink:1.12.3-scala_2.11
expose:
- "6121"
- "6122"
Expand Down
10 changes: 5 additions & 5 deletions documentation/README_Functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ These files can be obtained from `src/main/resources`:
## Example: RML Streamer + Flink
Flink's `lib` directory should contain the jar-files with the custom functions. In this example, these are marked with `*`
```
flink-1.11.2-scala_2.11
flink-1.12.3-scala_2.11
└── lib
├── GrelFunctions.jar *
├── IDLabFunctions.jar *
├── flink-dist_2.11-1.11.2.jar
├── flink-table-blink_2.11-1.11.2.jar
├── flink-table_2.11-1.11.2.jar
├── flink-dist_2.11-1.12.3.jar
├── flink-table-blink_2.11-1.12.3.jar
├── flink-table_2.11-1.12.3.jar
├── log4j-1.2.17.jar
└── slf4j-log4j12-1.7.15.jar
```
Expand All @@ -40,7 +40,7 @@ Note that the function descriptions and function mappings are present.

The command for running the RML Streamer on Flink should look like
```
~/flink/flink-1.11.2-scala_2.11/bin/flink run -c io.rml.framework.Main RMLStreamer-2.0.1-SNAPSHOT.jar toFile --output-path $(pwd)'/out.ttl' -m mapping.ttl
~/flink/flink-1.12.3-scala_2.11/bin/flink run -c io.rml.framework.Main RMLStreamer-2.1.1-SNAPSHOT.jar toFile --output-path $(pwd)'/out.ttl' -m mapping.ttl
```
## Test Cases
Expand Down
36 changes: 32 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ SOFTWARE.

<groupId>io.rml</groupId>
<artifactId>RMLStreamer</artifactId>
<version>2.1.0-RELEASE</version>
<version>2.1.1</version>
<packaging>jar</packaging>

<name>RMLStreamer</name>
<url>https://rml.io/</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.3</flink.version>
<flink.version>1.12.3</flink.version>
<slf4j.version>1.7.26</slf4j.version>
<log4j.version>2.13.3</log4j.version>
<jena.version>3.11.0</jena.version>
Expand All @@ -53,6 +53,20 @@ SOFTWARE.
</properties>



<repositories>
<repository>
<id>bintray</id>
<url>https://jcenter.bintray.com</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>

<!-- Framework dependencies -->
Expand Down Expand Up @@ -99,7 +113,7 @@ SOFTWARE.
<dependency>
<groupId>com.github.jsurfer</groupId>
<artifactId>jsurfer-jackson</artifactId>
<version>1.5.1</version>
<version>1.6.0</version>
<exclusions>
<!-- provided by Jena -->
<exclusion>
Expand Down Expand Up @@ -223,6 +237,15 @@ SOFTWARE.
<version>2.12.0</version>
<scope>test</scope>
</dependency>

<!-- embeddable MQTT broker -->
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.13</version>
<scope>test</scope>
</dependency>

<!-- explicitly add a standard logging framework, as Flink does not have
a hard dependency on one specific framework by default -->
<dependency>
Expand All @@ -238,7 +261,12 @@ SOFTWARE.
<type>pom</type>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>

</dependencies>

<!-- This profile helps to make things run out of the box in IntelliJ -->
Expand Down
99 changes: 58 additions & 41 deletions src/main/scala/io/rml/framework/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ package io.rml.framework


import io.rml.framework.api.{FnOEnvironment, RMLEnvironment}
import io.rml.framework.core.extractors.TriplesMapsCache
import io.rml.framework.core.function.flink.{FnOEnvironmentLoader, FnOEnvironmentStreamLoader, RichItemIdentityFunction, RichStreamItemIdentityFunction}
import io.rml.framework.core.extractors.NodeCache
import io.rml.framework.core.internal.Logging
import io.rml.framework.core.item.{EmptyItem, Item, JoinedItem}
import io.rml.framework.core.model._
import io.rml.framework.core.util.{StreamerConfig, Util}
import io.rml.framework.core.util.ParameterUtil.{OutputSinkOption, PostProcessorOption}
import io.rml.framework.core.util.{ParameterUtil, StreamerConfig, Util}
import io.rml.framework.engine._
import io.rml.framework.engine.statement.StatementEngine
import io.rml.framework.flink.connector.kafka.{RMLPartitioner, UniversalKafkaConnectorFactory}
import io.rml.framework.flink.item.{Item, JoinedItem}
import io.rml.framework.flink.source.{EmptyItem, FileDataSet, Source}
import io.rml.framework.flink.util.ParameterUtil
import io.rml.framework.flink.util.ParameterUtil.{OutputSinkOption, PostProcessorOption}
import io.rml.framework.flink.function.{FnOEnvironmentLoader, FnOEnvironmentStreamLoader, RichItemIdentityFunction, RichStreamItemIdentityFunction}
import io.rml.framework.flink.sink.{RichMQTTSink, TargetSinkFactory}
import io.rml.framework.flink.source.{FileDataSet, Source}
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
Expand Down Expand Up @@ -177,6 +177,10 @@ object Main extends Logging {
.build()
stream.addSink(sink).name("Streaming file sink")
}
else if (config.outputSink.equals(OutputSinkOption.MQTT)) {
val sink = new RichMQTTSink(config.broker.get, config.topic.get)
stream.addSink(sink)
}
// discard output if the parameter is given
else if (config.outputSink.equals(OutputSinkOption.None)) {
stream.addSink(output => {}).name("No output sink")
Expand Down Expand Up @@ -236,31 +240,26 @@ object Main extends Logging {
new RichStreamItemIdentityFunction()
}

// Create sinks for every logical target
val logicalTargetId2Sinks = TargetSinkFactory.createStreamSinksFromLogicalTargetCache()

// This is the collection of all data streams that are created by the current mapping
val processedStreams: immutable.Iterable[DataStream[String]] =
val processedStreams: immutable.Iterable[DataStream[String]] = {
sourceEngineMap.map(entry => {
val source = entry._1.asInstanceOf[io.rml.framework.flink.source.Stream]
val engine = entry._2
// link the different steps in each pipeline
source.stream // this will generate a stream of items
val dataStream = source.stream // this will generate a stream of items
// process every item by a processor with a loaded engine

.map(preProcessingFunction)
.map(new StdStreamProcessor(engine))
.name("Execute mapping statements on items")

// format every list of triples (as strings)
.flatMap(
list => {
if (list.nonEmpty) {
Some(list.reduce((a, b) => a + "\n" + b) + "\n\n")
} else {
None
}
}
)
.name("Convert triples to strings")
// add sinks to the data stream
TargetSinkFactory.appendSinksToStream(logicalTargetId2Sinks, dataStream)
})
}

// union all streams to one final stream
unionStreams(processedStreams)
Expand Down Expand Up @@ -350,17 +349,13 @@ object Main extends Logging {

// the "normal" scenario.
val engine = StatementEngine.fromTriplesMaps(List(triplesMap))
stream
val dataStream = stream
.map(new StdStreamProcessor(engine))
.name("Execute mapping statements on items")

// format every list of triples (as strings)
.flatMap(list =>
if (list.nonEmpty) {
Some(list.reduce((a, b) => a + "\n" + b) + "\n\n")
} else None
)
.name("Convert triples to strings")
val logicalTargetId2Sinks = TargetSinkFactory.createStreamSinksFromLogicalTargetCache()

TargetSinkFactory.appendSinksToStream(logicalTargetId2Sinks, dataStream)
}

})
Expand All @@ -375,7 +370,7 @@ object Main extends Logging {

formattedMapping.joinedSteamTriplesMaps.foreach(joinedTm => {
// identify the parent triples map
val parentTm = TriplesMapsCache.get(joinedTm.parentTriplesMap).get;
val parentTm = NodeCache.getTriplesMap(joinedTm.parentTriplesMap).get;

// find the parent source of the join condition
val joinParentSource = joinedTm.joinCondition.get.parent.identifier
Expand Down Expand Up @@ -504,9 +499,16 @@ object Main extends Logging {
.map(preProcessingFunction)
.map(new StdStaticProcessor(engine))
.name("Execute mapping statements on items")

.map(outputStringToLogicalTargetIDs => {
outputStringToLogicalTargetIDs.map(outputStringToLogicalTargetID => outputStringToLogicalTargetID._2)
// TODO: integrate logical target for data set.
})
.name("Ignoring the logical target for now.")
.flatMap(list => {
list.seq
})
// format every list of triples (as strings)
.flatMap(list => if (list.nonEmpty) Some(list.reduce((a, b) => a + "\n" + b) + "\n\n") else None)
.reduce((a, b) => a + "\n" + b + "\n\n")
.name("Convert triples to strings")
})

Expand Down Expand Up @@ -537,7 +539,7 @@ object Main extends Logging {
// filter out all items that do not contain the childs join condition
.filter(item => {
if (tm.joinCondition.isDefined) {
item.refer(tm.joinCondition.get.child.toString).isDefined
item.refer(tm.joinCondition.get.child.value).isDefined
} else true // if there are no join conditions all items can pass

// filter out all empty items (some iterators can emit empty items)
Expand All @@ -546,7 +548,7 @@ object Main extends Logging {
})


val parentTriplesMap = TriplesMapsCache.get(tm.parentTriplesMap).get;
val parentTriplesMap = NodeCache.getTriplesMap(tm.parentTriplesMap).get;
val parentDataset =
// Create a Source from the parents logical source
Source(parentTriplesMap.logicalSource).asInstanceOf[FileDataSet]
Expand All @@ -555,7 +557,7 @@ object Main extends Logging {
// filter out all items that do not contain the parents join condition
.filter(item => {
if (tm.joinCondition.isDefined) {
item.refer(tm.joinCondition.get.parent.toString).isDefined
item.refer(tm.joinCondition.get.parent.value).isDefined
} else true // if there are no join conditions all items can pass

// filter out all empty items
Expand All @@ -570,10 +572,10 @@ object Main extends Logging {
val joined: JoinDataSet[Item, Item] =
childDataset.join(parentDataset)
.where(item => {
item.refer(tm.joinCondition.get.child.toString).get.head
item.refer(tm.joinCondition.get.child.value).get.head
}) // empty fields are already filtered
.equalTo(item => {
item.refer(tm.joinCondition.get.parent.toString).get.head
item.refer(tm.joinCondition.get.parent.value).get.head
}) // empty fields are already filtered

joined.name("Join child and parent.")
Expand All @@ -588,7 +590,16 @@ object Main extends Logging {
.map(new JoinedStaticProcessor(engine)).name("Execute mapping statements on joined items")

// format the list of triples as strings
.flatMap(list => if (list.nonEmpty) Some(list.reduce((a, b) => a + "\n" + b)) else None)
.map(outputStringToLogicalTargetIDs => {
outputStringToLogicalTargetIDs.map(outputStringToLogicalTargetID => outputStringToLogicalTargetID._2)
// TODO: integrate logical target for data set.
})
.name("Ignoring the logical target for now.")
.flatMap(list => {
list.seq
})
// format every list of triples (as strings)
.reduce((a, b) => a + "\n" + b + "\n\n")
.name("Convert triples to strings")

} else { // if there are no join conditions a cross join will be executed
Expand All @@ -600,7 +611,16 @@ object Main extends Logging {
JoinedItem(items._1, items._2)
) // create a JoinedItem from the crossed items
.map(new JoinedStaticProcessor(engine)).name("Execute mapping statements on joined items") // process the joined items
.flatMap(list => if (list.nonEmpty) Some(list.reduce((a, b) => a + "\n" + b)) else None) // format the triples
.map(outputStringToLogicalTargetIDs => {
outputStringToLogicalTargetIDs.map(outputStringToLogicalTargetID => outputStringToLogicalTargetID._2)
// TODO: integrate logical target for data set.
})
.name("Ignoring the logical target for now.")
.flatMap(list => {
list.seq
})
// format every list of triples (as strings)
.reduce((a, b) => a + "\n" + b + "\n\n")
.name("Convert joined triples to strings")
}

Expand Down Expand Up @@ -642,7 +662,4 @@ object Main extends Logging {
} else head
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ object DataSourceExtractor {
* @return
*/
def apply(): DataSourceExtractor = {
lazy val extractor = new StdDataSourceExtractor()
extractor
new StdDataSourceExtractor()
}

}
Loading

0 comments on commit d97e560

Please sign in to comment.