diff --git a/examples/nyctaxi/README.md b/examples/nyctaxi/README.md
new file mode 100644
index 0000000000..3aa6868e21
--- /dev/null
+++ b/examples/nyctaxi/README.md
@@ -0,0 +1,87 @@
+# Apache Apex Example (NYC Taxi Data)
+
+## Overview
+
+This is an example that demonstrates how Apex can be used for processing ride service data, using the freely available
+historical Yellow Cab trip data on New York City government's web site.
+
+It uses concepts of event-time windowing, out-of-order processing and streaming windows.
+
+## Instructions
+
+### Data preparation
+Download some Yellow Cab trip data CSV files from the nyc.gov website.
+
+Let's say the data is saved as yellow_tripdata_2016-01.csv.
+
+Because the trip data source is wildly unordered, sort the data with some random deviation.
+```bash
+bash> sort -t, -k2 yellow_tripdata_2016-01.csv > yellow_tripdata_sorted_2016-01.csv
+```
+
+Then add some random deviation to the sorted data:
+
+```bash
+bash> cat nyctaxidata/yellow_tripdata_sorted_2016-01.csv | perl -e '@lines = (); while (<>) { if (@lines && rand(10) < 1) { print shift @lines; } if (rand(50) < 1) { push @lines, $_; } else { print $_; } }' > yellow_tripdata_sorted_random_2016-01.csv
+```
+
+Then create an HDFS directory and copy the csv file there:
+
+```bash
+bash> hdfs dfs -mkdir nyctaxidata
+bash> hdfs dfs -copyFromLocal yellow_tripdata_sorted_random_2016-01.csv nyctaxidata/
+```
+
+### Setting up pubsub server
+
+bash> git clone https://github.com/atrato/pubsub-server
+
+Then build and run the pubsub server (the message broker):
+
+bash> cd pubsub-server; mvn compile exec:java
+
+The pubsub server is now running, listening to the default port 8890 on localhost.
+
+### Running the application
+
+Open the Apex CLI command prompt and run the application:
+
+```bash
+bash> apex
+apex> launch target/malhar-examples-nyc-taxi-3.8.0-SNAPSHOT.apa
+```
+
+After the application has been running for 5 minutes, we can start querying the data. The reason why we need to wait
+5 minutes is because we need to wait for the first window to pass the watermark for the triggers to be fired by the
+WindowedOperator. Subsequent triggers will be fired every one minute since the slideBy is one minute.
+
+We can use the Simple WebSocket Client Google Chrome extension to query the data. Open the extension in Chrome and
+connect to "ws://localhost:8890/pubsub". Subscribe to the query result topic first because results to any query will be
+delivered to this topic by sending this to the websocket connection:
+
+```json
+{"type":"subscribe","topic":"nyctaxi.result"}
+```
+
+Issue a query with latitude/longitude somewhere in Manhattan:
+
+```json
+{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
+```
+
+You should get back something like the following:
+
+```json
+{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10011"},"timestamp":1500769034523}
+```
+
+The result to the same query changes as time goes by since we have "real-time" ride data coming in:
+```json
+{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
+{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10003"},"timestamp":1500769158530}
+{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
+{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10011"},"timestamp":1500769827538}
+{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
+{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10012"},"timestamp":1500770540527}
+```
+
diff --git a/examples/nyctaxi/pom.xml b/examples/nyctaxi/pom.xml
new file mode 100644
index 0000000000..990718aa4e
--- /dev/null
+++ b/examples/nyctaxi/pom.xml
@@ -0,0 +1,50 @@
+
+
+
+ 4.0.0
+
+ malhar-examples-nyc-taxi
+ jar
+
+ NYC Taxi Data Example for Apache Apex
+ Apex example applications that processes NYC Taxi Data.
+
+
+ org.apache.apex
+ malhar-examples
+ 3.8.0-SNAPSHOT
+
+
+
+
+ org.apache.commons
+ commons-lang3
+ 3.1
+
+
+ joda-time
+ joda-time
+ 2.9.1
+
+
+
+
diff --git a/examples/nyctaxi/src/assemble/appPackage.xml b/examples/nyctaxi/src/assemble/appPackage.xml
new file mode 100644
index 0000000000..4138cf201e
--- /dev/null
+++ b/examples/nyctaxi/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+
+
+ appPackage
+
+ jar
+
+ false
+
+
+ ${basedir}/target/
+ /app
+
+ ${project.artifactId}-${project.version}.jar
+
+
+
+ ${basedir}/target/deps
+ /lib
+
+
+ ${basedir}/src/site/conf
+ /conf
+
+ *.xml
+
+
+
+ ${basedir}/src/main/resources/META-INF
+ /META-INF
+
+
+ ${basedir}/src/main/resources/app
+ /app
+
+
+
+
+
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/Application.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/Application.java
new file mode 100644
index 0000000000..63abe0b9da
--- /dev/null
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/Application.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.nyctaxi;
+
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.accumulation.SumDouble;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
+import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.commons.lang3.mutable.MutableDouble;
+import org.apache.hadoop.conf.Configuration;
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+
+/**
+ * The DAG definition of the example that illustrates New York City taxi ride data processing.
+ */
+@ApplicationAnnotation(name = "NycTaxiExample")
+public class Application implements StreamingApplication
+{
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
+ NycTaxiDataReader inputOperator = new NycTaxiDataReader();
+ inputOperator.setDirectory("/user/" + System.getProperty("user.name") + "/nyctaxidata");
+ inputOperator.getScanner().setFilePatternRegexp(".*\\.csv$");
+ dag.addOperator("NycTaxiDataReader", inputOperator);
+ NycTaxiCsvParser parser = dag.addOperator("NycTaxiCsvParser", new NycTaxiCsvParser());
+ NycTaxiZipFareExtractor extractor = dag.addOperator("NycTaxiZipFareExtractor", new NycTaxiZipFareExtractor());
+
+ KeyedWindowedOperatorImpl windowedOperator = new KeyedWindowedOperatorImpl<>();
+
+ // 5-minute windows slide by 1 minute
+ windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(5)).slideBy(Duration.standardMinutes(1)));
+
+ // Because we only care about the last 5 minutes, and the watermark is set at t-1 minutes, lateness horizon is set to 4 minutes.
+ windowedOperator.setAllowedLateness(Duration.standardMinutes(4));
+ windowedOperator.setAccumulation(new SumDouble());
+ windowedOperator.setTriggerOption(TriggerOption.AtWatermark());
+ windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage());
+ windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage());
+
+ dag.addOperator("WindowedOperator", windowedOperator);
+
+ NycTaxiDataServer dataServer = dag.addOperator("NycTaxiDataServer", new NycTaxiDataServer());
+ ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+ dag.addStream("input_to_parser", inputOperator.output, parser.input);
+ dag.addStream("parser_to_extractor", parser.output, extractor.input);
+ dag.addStream("extractor_to_windowed", extractor.output, windowedOperator.input);
+ dag.addStream("extractor_watermark", extractor.watermarkOutput, windowedOperator.controlInput);
+ dag.addStream("windowed_to_console", windowedOperator.output, dataServer.input, console.input);
+
+ PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery();
+ wsQuery.enableEmbeddedMode();
+ wsQuery.setTopic("nyctaxi.query");
+ try {
+ wsQuery.setUri(new URI("ws://localhost:8890/pubsub"));
+ } catch (URISyntaxException ex) {
+ throw Throwables.propagate(ex);
+ }
+ dataServer.setEmbeddableQueryInfoProvider(wsQuery);
+ PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
+ wsResult.setTopic("nyctaxi.result");
+ try {
+ wsResult.setUri(new URI("ws://localhost:8890/pubsub"));
+ } catch (URISyntaxException ex) {
+ throw Throwables.propagate(ex);
+ }
+ dag.addStream("server_to_query_output", dataServer.queryResult, wsResult.input);
+ }
+}
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycLocationUtils.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycLocationUtils.java
new file mode 100644
index 0000000000..08f59b48ce
--- /dev/null
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycLocationUtils.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.nyctaxi;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Provides utilities for zip codes and lat-lon coordinates in New York City.
+ */
+public class NycLocationUtils
+{
+ public static class ZipRecord
+ {
+ public final String zip;
+ public final double lat;
+ public final double lon;
+ public String[] neighboringZips;
+
+ public ZipRecord(String zip, double lat, double lon)
+ {
+ this.zip = zip;
+ this.lat = lat;
+ this.lon = lon;
+ }
+ }
+
+ private static Map zipRecords = new HashMap<>();
+
+ static {
+ // setup of NYC zip data.
+ try (BufferedReader br = new BufferedReader(
+ new InputStreamReader(NycLocationUtils.class.getResourceAsStream("/nyc_zip_codes.csv")))) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ String[] s = line.split(",");
+ String zip = s[0].trim();
+ double lat = Double.valueOf(s[1].trim());
+ double lon = Double.valueOf(s[2].trim());
+ zipRecords.put(zip, new ZipRecord(zip, lat, lon));
+ }
+ } catch (IOException ex) {
+ throw Throwables.propagate(ex);
+ }
+ for (Map.Entry entry : zipRecords.entrySet()) {
+ final ZipRecord entryValue = entry.getValue();
+ List zips = new ArrayList<>(zipRecords.keySet());
+
+ Collections.sort(zips, new Comparator()
+ {
+ @Override
+ public int compare(String s1, String s2)
+ {
+ ZipRecord z1 = zipRecords.get(s1);
+ ZipRecord z2 = zipRecords.get(s2);
+ double dist1 = Math.pow(z1.lat - entryValue.lat, 2) + Math.pow(z1.lon - entryValue.lon, 2);
+ double dist2 = Math.pow(z2.lat - entryValue.lat, 2) + Math.pow(z2.lon - entryValue.lon, 2);
+ return Double.compare(dist1, dist2);
+ }
+ });
+ entryValue.neighboringZips = zips.subList(0, 8).toArray(new String[]{});
+ }
+ }
+
+ public static String getZip(double lat, double lon)
+ {
+ // Brute force to get the nearest zip centoid. Should be able to optimize this.
+ double minDist = Double.MAX_VALUE;
+ String zip = null;
+ for (Map.Entry entry : zipRecords.entrySet()) {
+ ZipRecord zipRecord = entry.getValue();
+ double dist = Math.pow(zipRecord.lat - lat, 2) + Math.pow(zipRecord.lon - lon, 2);
+ if (dist < minDist) {
+ zip = entry.getKey();
+ minDist = dist;
+ }
+ }
+ return zip;
+ }
+
+ public static String[] getNeighboringZips(String zip)
+ {
+ ZipRecord zipRecord = zipRecords.get(zip);
+ if (zipRecord != null) {
+ return zipRecord.neighboringZips;
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java
new file mode 100644
index 0000000000..f65e816058
--- /dev/null
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.nyctaxi;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Operator that parses historical New York City Yellow Cab ride data
+ * from http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml.
+ */
+public class NycTaxiCsvParser extends BaseOperator
+{
+ public final transient DefaultInputPort input = new DefaultInputPort()
+ {
+ @Override
+ public void process(String tuple)
+ {
+ String[] values = tuple.split(",");
+ Map outputTuple = new HashMap<>();
+ if (StringUtils.isNumeric(values[0])) {
+ outputTuple.put("pickup_time", values[1]);
+ outputTuple.put("pickup_lon", values[5]);
+ outputTuple.put("pickup_lat", values[6]);
+ outputTuple.put("total_fare", values[18]);
+ output.emit(outputTuple);
+ }
+ }
+ };
+
+ public final transient DefaultOutputPort