In June 2019, we announced the inclusion of Hazelcast Jet as a runner for Apache Beam.
Now it's time for an example showing how it's done. As a bonus, it's not "Word Count."
The data we will use is a series of 2,000 GPS points and time offsets:# Latitude, Longitude, Time-Offset 45.417,8.179,1629 45.417,8.178,1630 45.416,8.178,1631 45.416,8.177,1632 45.415,8.176,1633These points are real data. They come from a trip on Italy's famous Freccia Rossa (Red Arrow) train from Milan to Turin.
Being real data, it gives us some real problems to solve. Since trains move, they are connected wirelessly. Delivering the position over wireless networks may mean some locations arrive twice or out of order. In some cases, such as when a train is in a tunnel, the positions may not arrive at all.
This example application mainly does two things.It uses a Beam pipeline to enrich the stream of GPS points. They are parsed and reformatted from CSV to JSON, and then windowing is used to drop some out of sequence points. This is not the most sophisticated Beam example, but that's not the objective.
These points are plotted dynamically on a map to make things more visual using JavaScript and a WebSocket. This is done as they arrive, stream processing not batch processing.
The basic concept of Apache Beam is that the definition of a processing pipeline is independent and agnostic of the execution platform.We define a Beam pipeline and choose to run it on Hazelcast Jet.
We could choose to run it with something other than Hazelcast Jet, but we won't.
Take care that although Beam is platform-independent, not all platforms implement all features. For more information on feature compatibility, please refer to the Apache Beam Capability Matrix.
This example consists of four processes running simultaneously. As it uses file watching logic, we also want to run them all from the same directory to keep things simple.Note: Using the same directory is crucial to making this demo work. Step 4 writes to the directory that steps 2 and 5 are expecting to keep the example simple. You can change this in the code if you want, but if you don't wish to, be sure to run all commands from the same directory.
Download hazelcast-jet-demos.From the top level of the "train-track
" folder run this command to build:
mvn clean installRun this command to start one Jet grid process, and leave it running:
java -jar train-track-grid/target/train-track-grid.jarThis will create a Jet grid running on port 8701 on your machine. The grid is named frecciarossa.
If you have access to either or both of Hazelcast IMDG Management Center and Hazelcast Jet Management Center, you have additional ways to monitor the activity.
The Jet grid is where the Beam job will run. Although we won't do so here, the Jet grid could contain several Jet processes, spread across multiple machines, making full use of all available CPUs.
Start the web UI using this command, and leave it running:java -jar train-track-web-ui/target/train-track-web-ui.jarBring up this page in a browser:
http://localhost:8085/Assuming you have an internet connection, the browser will download a map of Milan to display on the screen.
This process connects to the Hazelcast Jet cluster to find the location of the train. We won't see the position of the train yet as we haven't started the data feed, which should appear after Step 5 runs.
Next, run this command:java -jar train-track-data-feed/target/train-track-data-feed.jarThe command will output four GPS points per second to the screen, and into a file named "
beam-input
" in the current directory.
There are over 2,000 points recorded, so this job will take more than fifteen minutes to complete. The actual train trip from Milan to Turin takes 65 minutes.
Don't wait for it to complete, as soon as output appears, proceed to Step 5.
Finally, run this command:java -jar train-track-beam-runner/target/train-track-beam-runner.jarBe sure to select "
train-track-beam-runner/target/train-track-beam-runner.jar
" not "train-track-beam-runner/target/train-track-beam-runner-shaded.jar
".
This step will submit a Beam job to run in Hazelcast Jet process started in Step 2.
It will read the file "beam-input
" being produced by the process started in Step 4.
It will produce files named "beam-output*
" with the enriched GPS information. In addition to the latitude and longitude, we should also have the time in a more human readable form.
beam-input
" file you should see a sequence of lines for the GPS points:
45.464,8.438,1364 45.464,8.437,1365If you look at the "
beam-output*
" files you should see the enriched output for these GPS points:
{ "input-point": "953", "latitude": "45.464", "longitude": "8.438", "timestamp": "1565644594246", "timeprint": "22:16:34" } { "input-point": "954", "latitude": "45.464", "longitude": "8.437", "timestamp": "1565644594496", "timeprint": "22:16:34" }Looking at the browser, you should see it updating as the points are plotted on the map. Something like this:
If you don't get these three, something went wrong. It's worth finding out where and why.
How does this work?The cleverness is in the "train-track-beam-runner
" module, and specifically this coding:
public static void main(String[] args) { JetPipelineOptions jetPipelineOptions = PipelineOptionsFactory.create().as(JetPipelineOptions.class); jetPipelineOptions.setCodeJarPathname("train-track-beam-runner/target/train-track-beam-runner-shaded.jar"); jetPipelineOptions.setJetDefaultParallelism(1); jetPipelineOptions.setJetGroupName("frecciarossa"); jetPipelineOptions.setJetServers("127.0.0.1:8701"); jetPipelineOptions.setJobName(MyBeamJob.class.getSimpleName()); jetPipelineOptions.setRunner(JetRunner.class); Pipeline pipeline = MyBeamJob.build(jetPipelineOptions); pipeline.run(); }
This does three things.
It creates the object "jetPipelineOptions
" which contains the specifics for running a Beam job in Hazelcast Jet. The essential parts being the Jar file that contains the code to run, and "JetRunner
" as the thing that runs it.
It then creates the Beam processing pipeline, passing in the "jetPipelineOptions
" as a parameter to be held within the processing job.
Lastly, it uses the "pipeline.run()
" command to send the job to the execution engine for processing. We don't wait on the job to complete, fire and forget.
This next section gives a more detailed explanation for the curious. Skip on to the summary if you prefer. If not, here goes explaining how the five modules in this demo interact,
The first job in the application istrain-track-data-feed
.
This has nothing to do with Hazelcast Jet or Apache Beam.
All it does is write a series of data points, from the code file "FrecciaRossa.java
" to a file named "beam-input
" in the top-level directory.
The GPS points are already captured, but we're trying to simulate a real feed coming in.
The filename "beam-input
" is meant to imply it is the input file for Beam processing. Of course, it's also the output file from this module.
In the instructions, this is Step 4. But it doesn't have a dependency on steps 2 or 3.
This module is the whole point of the example; it defines processing logic using the Apache Beam Java SDK. There is no reference to Hazelcast Jet in this module.The processing reads from a file called "beam-input
" and writes to "beam-output
". Between this read and write, the data is enriched.
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> </dependency>It has no reliance on Hazelcast Jet; everything is standard Apache Beam. The input is in CSV format, for the latitude, longitude and time offset.
The time offset is in relation to when the train left Milan, the units are seconds.
The first point is "45.487,9.205,0000
". The latitude is 45.487 North, the longitude is 9.205 West, approximately here in Milan. The time offset is zero as this is the first point.
The last point is "45.059,7.675,3762
". The latitude is 45.059 North, the longitude is 7.675 West, approximately here in Turin. The time offset is 3762 seconds, 62 minutes after recording began.
It has a dependency on the Beam pipeline that it submits, "train-track-beam-job
":
<dependency> <groupId>${project.groupId}</groupId> <artifactId>train-track-beam-job</artifactId> <version>${project.version}</version> </dependency>In the "
pom.xml
" file, you'll see that we use "maven-shade-plugin
".
This takes this module ("train-track-beam-runner
") and all its dependencies (especially "train-track-beam-runner
"), expands them out and then re-bundles them as a single unit.
The result "train-track-beam-runner-shaded.jar
" is more of a Jar that contains ".class
" files than a Jar that contains ".jar
" files, making it easier for Beam to find the classes.
We take a simplistic approach here. One of the bundled dependencies is Hazelcast Jet itself. When we submit this bundled Jar to the Hazelcast Jet grid, it will already have these classes. As the versions are the same, it does no harm, but we could make the configuration more selective at the expense of simplicity.
In the main class, six options are used to configure the deployment:-
jetPipelineOptions.setCodeJarPathname("train-track-beam-runner/target/train-track-beam-runner-shaded.jar");
This is the Jar file we wish to send to the Jet cluster to execute and has been assembled using the "maven-shade-plugin
".It needs to include the pipeline job classes, plus some of the dependent classes used, like "
JetRunner
". Due to the basic configuration used for shading, this Jar also contains some unnecessary classes. -
jetPipelineOptions.setJetDefaultParallelism(1);
For Jet to achieve high performance, a job will generally run multiple instances in each JVM, and a Jet cluster will have multiple JVMs. So for each, the processing will be highly parallel.
But we only have one input file to process, so we reduce down the parallelism to make things simpler to understand.
-
jetPipelineOptions.setJetGroupName("frecciarossa");
This is the cluster name for the Jet cluster we wish to connect. No password credentials are needed in this example as security is not enabled.
-
jetPipelineOptions.setJetServers("127.0.0.1:8701");
This option lists some of the Jet servers in the cluster to attempt a connection. The first reply includes the addresses of all the others, and then connections are opened to them, too.
Generally, 2 or 3 are enough places to try, but since this example only uses one server, that's all we should specify.
-
jetPipelineOptions.setJobName(MyBeamJob.class.getSimpleName());
For visualization on the Hazelcast Jet Management Center, it is useful if the job has a name.
-
jetPipelineOptions.setRunner(JetRunner.class);
This is the class that will run the Beam pipeline against the Jet grid. For now, it will always be set to "
JetRunner
".
When this grid starts, it doesn't know of the Beam job and therefore won't be running it. Indeed, it doesn't have any of the Beam classes in its classpath (check the "pom.xml
"), until the "train-track-beam-runner
" send these classes to the grid.
The one non-standard piece of the grid is a Jet job (not a Beam job) configured to run at start-up, called FileWatcher.
The class "FileWatcher.java
" defines a Jet job to run when the grid starts. It only contains two effective lines.
pipeline.drawFrom(FileWatcher.buildFileWatcherSource()).withoutTimestamps() .drainTo(FileWatcher.buildTopicSink());What this job does is look for lines in files named "
beam-output*
" and publish those lines to a Hazelcast topic named treno.
This is just a bit of plumbing.
The Jet job detects each line written by the Beam job, and that line is written to a Hazelcast topic for the Web UI. The Beam job can't do this itself, as the Beam job doesn't know it is running in Hazelcast Jet.
The last module,train-track-web-ui
, does some cool stuff, but it's not much to do with Hazelcast.
The Java part of the coding is again just a bit of plumbing. What it does is subscribe to the Hazelcast topic "treno" that "train-track-grid
" is publishing to, and passing the message payload received through a Web Socket to JavaScript running in your browser.
The Web UI runs on port 8085, to avoid potential clashes with Hazelcast IMDG Management Center on 8080 or Hazelcast Jet Management Center on 8081.
All you need do is point your browser at:
http://localhost:8085/
In the browser, OpenStreetMap is used to do the plotting. The code is mostly just this:
function onMessage(message) { var json = JSON.parse(message); var latitude = json.latitude; var longitude = json.longitude; var timeprint = json.timeprint; var myMarker = L.marker([latitude, longitude]); myMarker.bindPopup('<p>' + timeprint + '</p>'); myMarker.addTo(map); myMarker.openPopup(); }
When a message is received from the Web Socket, it is parsed as JSON and the fields with the text names latitude, longitude, and timeprint are extracted.
A marker point is plotted on the map using the latitude and longitude, and a text field is added to this marker containing the time of day.
Data arrives every up to four points a second, with the time of day shown on the most recent. Old points are still present, so we can see where the train has been.
Gaps in the points shown might be a tunnel, be when there was no signal to deliver the message or some other fault.
So, if all four processes are running, what will happen is roughly this:"train-track-data-feed
" will write one CSV line per second to a file named "beam-input
". eg.
45.059,7.675,3762
"train-track-beam-job
" running in the Jet grid will read that one CSV line from "beam-input
" and write JSON to "beam-output*
":
{ "input-point": "2057", "latitude": "45.059", "longitude": "7.675", "timestamp": "1565645192996", "timeprint": "22:26:32" }
The FileWatcher job running in the Jet grid will read the one JSON line from "beam-output*
" and publish it unchanged to the topic "treno
":
{ "input-point": "2057", "latitude": "45.059", "longitude": "7.675", "timestamp": "1565645192996", "timeprint": "22:26:32" }
The "train-track-web-ui
" subscribes to the "treno
" topic, and passes this JSON on through a web socket to your browser:
{ "input-point": "2057", "latitude": "45.059", "longitude": "7.675", "timestamp": "1565645192996", "timeprint": "22:26:32" }
Code in the browser plots the JSON on a map!
You'll note that only Beam really does any proper processing. Everything else is just moving data from place to place without enrichment, aggregation, or the like.
This example shows how to create and execute an Apache Beam processing job in Hazelcast Jet. It also subliminally teaches you the location of two cities in northern Italy.You can define a Beam processing job in Java just as before.
If you have Apache Beam 2.14 or later, the new "JetRunner
" allows you to submit this to Hazelcast Jet for execution. We do it here with ten lines of code, though you could trim that a bit.
And that's it! You can run an Apache Beam job in Hazelcast Jet or elsewhere. The trade-off is you don't get to use all the power of Jet specifics, the sacrifice necessary to go with implementation-independent processing.