This repository has been archived by the owner on May 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 147
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
APEXMALHAR-2547 nyc taxi example checkin
- Loading branch information
1 parent
c92c713
commit 1d504ac
Showing
13 changed files
with
997 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
# Apache Apex Example (NYC Taxi Data) | ||
|
||
## Overview | ||
|
||
This is an example that demonstrates how Apex can be used for processing ride service data, using the freely available | ||
historical Yellow Cab trip data on New York City government's web site. | ||
|
||
It uses concepts of event-time windowing, out-of-order processing and streaming windows. | ||
|
||
## Instructions | ||
|
||
### Data preparation | ||
Download some Yellow Cab trip data CSV files from the nyc.gov website. | ||
|
||
Let's say the data is saved as yellow_tripdata_2016-01.csv. | ||
|
||
Because the trip data source is wildly unordered, sort the data with some random deviation. | ||
```bash | ||
bash> sort -t, -k2 yellow_tripdata_2016-01.csv > yellow_tripdata_sorted_2016-01.csv | ||
``` | ||
|
||
Then add some random deviation to the sorted data: | ||
|
||
```bash | ||
bash> cat nyctaxidata/yellow_tripdata_sorted_2016-01.csv | perl -e '@lines = (); while (<>) { if (@lines && rand(10) < 1) { print shift @lines; } if (rand(50) < 1) { push @lines, $_; } else { print $_; } }' > yellow_tripdata_sorted_random_2016-01.csv | ||
``` | ||
|
||
Then create an HDFS directory and copy the csv file there: | ||
|
||
```bash | ||
bash> hdfs dfs -mkdir nyctaxidata | ||
bash> hdfs dfs -copyFromLocal yellow_tripdata_sorted_random_2016-01.csv nyctaxidata/ | ||
``` | ||
|
||
### Setting up pubsub server | ||
|
||
bash> git clone https://github.com/atrato/pubsub-server | ||
|
||
Then build and run the pubsub server (the message broker): | ||
|
||
bash> cd pubsub-server; mvn compile exec:java | ||
|
||
The pubsub server is now running, listening to the default port 8890 on localhost. | ||
|
||
### Running the application | ||
|
||
Open the Apex CLI command prompt and run the application: | ||
|
||
```bash | ||
bash> apex | ||
apex> launch target/malhar-examples-nyc-taxi-3.8.0-SNAPSHOT.apa | ||
``` | ||
|
||
After the application has been running for 5 minutes, we can start querying the data. The reason why we need to wait | ||
5 minutes is because we need to wait for the first window to pass the watermark for the triggers to be fired by the | ||
WindowedOperator. Subsequent triggers will be fired every one minute since the slideBy is one minute. | ||
|
||
We can use the Simple WebSocket Client Google Chrome extension to query the data. Open the extension in Chrome and | ||
connect to "ws://localhost:8890/pubsub". Subscribe to the query result topic first because results to any query will be | ||
delivered to this topic by sending this to the websocket connection: | ||
|
||
```json | ||
{"type":"subscribe","topic":"nyctaxi.result"} | ||
``` | ||
|
||
Issue a query with latitude/longitude somewhere in Manhattan: | ||
|
||
```json | ||
{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}} | ||
``` | ||
|
||
You should get back something like the following: | ||
|
||
```json | ||
{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10011"},"timestamp":1500769034523} | ||
``` | ||
|
||
The result to the same query changes as time goes by since we have "real-time" ride data coming in: | ||
```json | ||
{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}} | ||
{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10003"},"timestamp":1500769158530} | ||
{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}} | ||
{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10011"},"timestamp":1500769827538} | ||
{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}} | ||
{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10012"},"timestamp":1500770540527} | ||
``` | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>malhar-examples-nyc-taxi</artifactId> | ||
<packaging>jar</packaging> | ||
|
||
<name>NYC Taxi Data Example for Apache Apex</name> | ||
<description>Apex example applications that processes NYC Taxi Data.</description> | ||
|
||
<parent> | ||
<groupId>org.apache.apex</groupId> | ||
<artifactId>malhar-examples</artifactId> | ||
<version>3.8.0-SNAPSHOT</version> | ||
</parent> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.commons</groupId> | ||
<artifactId>commons-lang3</artifactId> | ||
<version>3.1</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>joda-time</groupId> | ||
<artifactId>joda-time</artifactId> | ||
<version>2.9.1</version> | ||
</dependency> | ||
</dependencies> | ||
|
||
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
--> | ||
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> | ||
<id>appPackage</id> | ||
<formats> | ||
<format>jar</format> | ||
</formats> | ||
<includeBaseDirectory>false</includeBaseDirectory> | ||
<fileSets> | ||
<fileSet> | ||
<directory>${basedir}/target/</directory> | ||
<outputDirectory>/app</outputDirectory> | ||
<includes> | ||
<include>${project.artifactId}-${project.version}.jar</include> | ||
</includes> | ||
</fileSet> | ||
<fileSet> | ||
<directory>${basedir}/target/deps</directory> | ||
<outputDirectory>/lib</outputDirectory> | ||
</fileSet> | ||
<fileSet> | ||
<directory>${basedir}/src/site/conf</directory> | ||
<outputDirectory>/conf</outputDirectory> | ||
<includes> | ||
<include>*.xml</include> | ||
</includes> | ||
</fileSet> | ||
<fileSet> | ||
<directory>${basedir}/src/main/resources/META-INF</directory> | ||
<outputDirectory>/META-INF</outputDirectory> | ||
</fileSet> | ||
<fileSet> | ||
<directory>${basedir}/src/main/resources/app</directory> | ||
<outputDirectory>/app</outputDirectory> | ||
</fileSet> | ||
</fileSets> | ||
|
||
</assembly> | ||
|
102 changes: 102 additions & 0 deletions
102
examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/Application.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.apex.examples.nyctaxi; | ||
|
||
|
||
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
|
||
import org.joda.time.Duration; | ||
|
||
import org.apache.apex.malhar.lib.window.TriggerOption; | ||
import org.apache.apex.malhar.lib.window.WindowOption; | ||
import org.apache.apex.malhar.lib.window.WindowState; | ||
import org.apache.apex.malhar.lib.window.accumulation.SumDouble; | ||
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; | ||
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; | ||
import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; | ||
import org.apache.commons.lang3.mutable.MutableDouble; | ||
import org.apache.hadoop.conf.Configuration; | ||
import com.google.common.base.Throwables; | ||
|
||
import com.datatorrent.api.DAG; | ||
import com.datatorrent.api.StreamingApplication; | ||
import com.datatorrent.api.annotation.ApplicationAnnotation; | ||
import com.datatorrent.lib.io.ConsoleOutputOperator; | ||
import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; | ||
import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; | ||
|
||
/** | ||
* The DAG definition of the example that illustrates New York City taxi ride data processing. | ||
*/ | ||
@ApplicationAnnotation(name = "NycTaxiExample") | ||
public class Application implements StreamingApplication | ||
{ | ||
@Override | ||
public void populateDAG(DAG dag, Configuration conf) | ||
{ | ||
dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000); | ||
NycTaxiDataReader inputOperator = new NycTaxiDataReader(); | ||
inputOperator.setDirectory("/user/" + System.getProperty("user.name") + "/nyctaxidata"); | ||
inputOperator.getScanner().setFilePatternRegexp(".*\\.csv$"); | ||
dag.addOperator("NycTaxiDataReader", inputOperator); | ||
NycTaxiCsvParser parser = dag.addOperator("NycTaxiCsvParser", new NycTaxiCsvParser()); | ||
NycTaxiZipFareExtractor extractor = dag.addOperator("NycTaxiZipFareExtractor", new NycTaxiZipFareExtractor()); | ||
|
||
KeyedWindowedOperatorImpl<String, Double, MutableDouble, Double> windowedOperator = new KeyedWindowedOperatorImpl<>(); | ||
|
||
// 5-minute windows slide by 1 minute | ||
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(5)).slideBy(Duration.standardMinutes(1))); | ||
|
||
// Because we only care about the last 5 minutes, and the watermark is set at t-1 minutes, lateness horizon is set to 4 minutes. | ||
windowedOperator.setAllowedLateness(Duration.standardMinutes(4)); | ||
windowedOperator.setAccumulation(new SumDouble()); | ||
windowedOperator.setTriggerOption(TriggerOption.AtWatermark()); | ||
windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableDouble>()); | ||
windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); | ||
|
||
dag.addOperator("WindowedOperator", windowedOperator); | ||
|
||
NycTaxiDataServer dataServer = dag.addOperator("NycTaxiDataServer", new NycTaxiDataServer()); | ||
ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); | ||
dag.addStream("input_to_parser", inputOperator.output, parser.input); | ||
dag.addStream("parser_to_extractor", parser.output, extractor.input); | ||
dag.addStream("extractor_to_windowed", extractor.output, windowedOperator.input); | ||
dag.addStream("extractor_watermark", extractor.watermarkOutput, windowedOperator.controlInput); | ||
dag.addStream("windowed_to_console", windowedOperator.output, dataServer.input, console.input); | ||
|
||
PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery(); | ||
wsQuery.enableEmbeddedMode(); | ||
wsQuery.setTopic("nyctaxi.query"); | ||
try { | ||
wsQuery.setUri(new URI("ws://localhost:8890/pubsub")); | ||
} catch (URISyntaxException ex) { | ||
throw Throwables.propagate(ex); | ||
} | ||
dataServer.setEmbeddableQueryInfoProvider(wsQuery); | ||
PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult()); | ||
wsResult.setTopic("nyctaxi.result"); | ||
try { | ||
wsResult.setUri(new URI("ws://localhost:8890/pubsub")); | ||
} catch (URISyntaxException ex) { | ||
throw Throwables.propagate(ex); | ||
} | ||
dag.addStream("server_to_query_output", dataServer.queryResult, wsResult.input); | ||
} | ||
} |
Oops, something went wrong.