-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
349 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
FROM flink:1.16.2 | ||
|
||
|
||
# install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source | ||
# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially. | ||
RUN apt-get update -y && \ | ||
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev liblzma-dev && \ | ||
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \ | ||
tar -xvf Python-3.7.9.tgz && \ | ||
cd Python-3.7.9 && \ | ||
./configure --without-tests --enable-shared && \ | ||
make -j6 && \ | ||
make install && \ | ||
ldconfig /usr/local/lib && \ | ||
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \ | ||
ln -s /usr/local/bin/python3 /usr/local/bin/python && \ | ||
apt-get clean && \ | ||
rm -rf /var/lib/apt/lists/* | ||
|
||
# install PyFlink | ||
RUN pip3 install apache-flink==1.16.2 feathub-nightly[flink] | ||
|
||
COPY main.py /main.py | ||
|
||
COPY data /tmp/data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
# Overview | ||
|
||
This example shows how to use `DerivedFeatureView` to backfill the input dataset | ||
with extra features for offline training. It involves the following steps: | ||
|
||
1. Read a batch of historical purchase events from a file. | ||
|
||
Each purchase event has the following fields: | ||
- user_id, unique identifier of the user that made the purchase. | ||
- item_id, unique identifier of the item that is purchased. | ||
- item_count, number of items purchased. | ||
- timestamp, time when this purchase is made. | ||
|
||
2. Read a batch of historical item price events from a file. | ||
|
||
Each item price event has the following fields: | ||
- item_id, unique identifier of the item. | ||
- price, the new price of this item. | ||
- timestamp, time when the new price is used for this item. | ||
|
||
3. For each purchase event, append the following two fields by joining with item | ||
price events and performing over-window aggregation, with point-in-time | ||
correctness in both operations. | ||
|
||
- price, price of the item at the time this purchase is made. | ||
- total_payment_last_two_minutes, total cost of purchases made by this | ||
user in a 2-minute window that ends at the time this purchase is made. | ||
|
||
4. Output the batch of purchase events backfilled with the extra features to a | ||
file. | ||
|
||
The example demonstrate how to submit the Feathub job to a Kubernetes cluster in Flink | ||
native Kubernetes application mode. | ||
|
||
# Prerequisites | ||
|
||
Prerequisites for running this example: | ||
- Unix-like operating system (e.g. Linux, Mac OS X) | ||
- Python 3.7 | ||
- Minikube 1.30.1 | ||
- kubectl 1.25.9 | ||
|
||
# Step-By-Step Instructions | ||
|
||
Please execute the following commands under the `flink-derived-feature-view` | ||
folder to run this example. | ||
|
||
1. Start the Minikube | ||
|
||
```bash | ||
$ minikube start | ||
``` | ||
|
||
After the Minikube started, you can run `kubectl get ns` to see the namespace in | ||
the cluster. | ||
|
||
2. Build the Flink image | ||
|
||
Flink Kubernetes application mode requires that the user code is bundle together with | ||
Flink image. And we need to install Feathub in the image. You can run the following | ||
command to build the image to be used by Minikube. | ||
|
||
```bash | ||
eval $(minikube docker-env) | ||
docker build -q --rm -t flink-k8s-app . | ||
``` | ||
|
||
3. Submit the Feathub job to Kubernetes cluster | ||
|
||
```bash | ||
# Create the output directory in the Minikube. | ||
$ minikube ssh -- 'mkdir -p -m=777 /tmp/flink-kubernetes-application/output' | ||
|
||
# Grant the default service account with permission to create, delete pods. | ||
$ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default | ||
|
||
$ curl -LO https://archive.apache.org/dist/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz | ||
$ tar -xzf flink-1.16.2-bin-scala_2.12.tgz | ||
$ ./flink-1.16.2/bin/flink run-application \ | ||
--target kubernetes-application \ | ||
-Dkubernetes.container.image=flink-k8s-app:latest \ | ||
-Dkubernetes.pod-template-file=./pod-template.yaml \ | ||
-py /main.py | ||
``` | ||
|
||
Once the job is submitted, you can list the pod that runs the Flink JobManager and | ||
check out the log. | ||
|
||
```bash | ||
# List the running pod. | ||
$ kubectl get pod | ||
|
||
$ kubectl logs <pod-name> | ||
``` | ||
|
||
4. Checkout the outputs. | ||
|
||
```bash | ||
$ minikube ssh -- 'cat /tmp/flink-kubernetes-application/output/output.json/*' | ||
``` | ||
|
||
The file should contain the following rows: | ||
|
||
``` | ||
user_1,item_1,1,"2022-01-01 00:00:00",100.0,100.0 | ||
user_1,item_2,2,"2022-01-01 00:01:00",200.0,500.0 | ||
user_1,item_1,3,"2022-01-01 00:02:00",200.0,1100.0 | ||
user_2,item_1,1,"2022-01-01 00:03:00",300.0,300.0 | ||
user_1,item_3,2,"2022-01-01 00:04:00",300.0,1200.0 | ||
``` | ||
|
||
5. Tear down the Minikube. | ||
|
||
```bash | ||
minikube stop | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
user_1,item_1,1,"2022-01-01 00:00:00",100.0,100.0 | ||
user_1,item_2,2,"2022-01-01 00:01:00",200.0,500.0 | ||
user_1,item_1,3,"2022-01-01 00:02:00",200.0,1100.0 | ||
user_2,item_1,1,"2022-01-01 00:03:00",300.0,300.0 | ||
user_1,item_3,2,"2022-01-01 00:04:00",300.0,1200.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{"item_id":"item_1", "price":100.0, "timestamp":"2022-01-01 00:00:00"} | ||
{"item_id":"item_2", "price":200.0, "timestamp":"2022-01-01 00:00:00"} | ||
{"item_id":"item_3", "price":300.0, "timestamp":"2022-01-01 00:00:00"} | ||
{"item_id":"item_1", "price":200.0, "timestamp":"2022-01-01 00:01:30"} | ||
{"item_id":"item_1", "price":300.0, "timestamp":"2022-01-01 00:02:30"} | ||
{"item_id":"item_1", "price":400.0, "timestamp":"2022-01-01 00:03:30"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{"user_id":"user_1", "item_id":"item_1", "item_count":1, "timestamp":"2022-01-01 00:00:00"} | ||
{"user_id":"user_1", "item_id":"item_2", "item_count":2, "timestamp":"2022-01-01 00:01:00"} | ||
{"user_id":"user_1", "item_id":"item_1", "item_count":3, "timestamp":"2022-01-01 00:02:00"} | ||
{"user_id":"user_2", "item_id":"item_1", "item_count":1, "timestamp":"2022-01-01 00:03:00"} | ||
{"user_id":"user_1", "item_id":"item_3", "item_count":2, "timestamp":"2022-01-01 00:04:00"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
# Copyright 2022 The FeatHub Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from datetime import timedelta | ||
|
||
from feathub.feathub_client import FeathubClient | ||
from feathub.feature_tables.sinks.file_system_sink import FileSystemSink | ||
from feathub.feature_views.feature import Feature | ||
from feathub.feature_views.derived_feature_view import DerivedFeatureView | ||
from feathub.feature_views.transforms.over_window_transform import ( | ||
OverWindowTransform, | ||
) | ||
|
||
from feathub.common import types | ||
from feathub.feature_tables.sources.file_system_source import FileSystemSource | ||
from feathub.table.schema import Schema | ||
|
||
if __name__ == "__main__": | ||
client = FeathubClient( | ||
props={ | ||
"processor": { | ||
"type": "flink", | ||
"flink": { | ||
"deployment_mode": "cli", | ||
}, | ||
}, | ||
"online_store": { | ||
"types": ["memory"], | ||
"memory": {}, | ||
}, | ||
"registry": { | ||
"type": "local", | ||
"local": { | ||
"namespace": "default", | ||
}, | ||
}, | ||
"feature_service": { | ||
"type": "local", | ||
"local": {}, | ||
}, | ||
} | ||
) | ||
|
||
purchase_events_schema = ( | ||
Schema.new_builder() | ||
.column("user_id", types.String) | ||
.column("item_id", types.String) | ||
.column("item_count", types.Int32) | ||
.column("timestamp", types.String) | ||
.build() | ||
) | ||
|
||
purchase_events_source = FileSystemSource( | ||
name="purchase_events", | ||
path="/tmp/data/purchase_events.json", | ||
data_format="json", | ||
schema=purchase_events_schema, | ||
timestamp_field="timestamp", | ||
timestamp_format="%Y-%m-%d %H:%M:%S", | ||
) | ||
|
||
item_price_events_schema = ( | ||
Schema.new_builder() | ||
.column("item_id", types.String) | ||
.column("price", types.Float32) | ||
.column("timestamp", types.String) | ||
.build() | ||
) | ||
|
||
item_price_events_source = FileSystemSource( | ||
name="item_price_events", | ||
path="/tmp/data/item_price_events.json", | ||
data_format="json", | ||
schema=item_price_events_schema, | ||
keys=["item_id"], | ||
timestamp_field="timestamp", | ||
timestamp_format="%Y-%m-%d %H:%M:%S", | ||
) | ||
|
||
# The total cost of purchases made by this user in the last 2 minutes. | ||
f_total_payment_last_two_minutes = Feature( | ||
name="total_payment_last_two_minutes", | ||
transform=OverWindowTransform( | ||
expr="item_count * price", | ||
agg_func="SUM", | ||
window_size=timedelta(minutes=2), | ||
group_by_keys=["user_id"], | ||
), | ||
) | ||
|
||
purchase_events_with_features = DerivedFeatureView( | ||
name="purchase_events_with_features", | ||
source=purchase_events_source, | ||
features=[ | ||
"item_price_events.price", | ||
f_total_payment_last_two_minutes, | ||
], | ||
keep_source_fields=True, | ||
) | ||
|
||
client.build_features( | ||
[ | ||
item_price_events_source, | ||
purchase_events_with_features, | ||
] | ||
) | ||
|
||
result_table = client.get_features(purchase_events_with_features) | ||
|
||
result_table_df = result_table.to_pandas() | ||
|
||
print(result_table_df) | ||
|
||
local_sink = FileSystemSink(path="/tmp/data/output/output.json", data_format="csv") | ||
|
||
result_table.execute_insert(sink=local_sink, allow_overwrite=True).wait() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
apiVersion: v1 | ||
kind: Pod | ||
spec: | ||
containers: | ||
- name: flink-main-container | ||
volumeMounts: | ||
- mountPath: /tmp/data/output | ||
name: flink-volume-hostpath | ||
volumes: | ||
- name: flink-volume-hostpath | ||
hostPath: | ||
path: /tmp/flink-kubernetes-application/output | ||
type: DirectoryOrCreate |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# | ||
# Copyright 2022 The FeatHub Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
set -e | ||
|
||
cd "$(dirname "$0")" | ||
PROJECT_DIR=$(cd "$(pwd)/.."; pwd) | ||
source "${PROJECT_DIR}"/tools/utils.sh | ||
|
||
eval $(minikube docker-env) | ||
docker build -q --rm -t flink-k8s-app . | ||
minikube ssh -- 'mkdir -p -m=777 /tmp/flink-kubernetes-application/output' | ||
kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default | ||
|
||
curl -LO https://archive.apache.org/dist/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz | ||
tar -xzf flink-1.16.2-bin-scala_2.12.tgz | ||
./flink-1.16.2/bin/flink run-application \ | ||
--target kubernetes-application \ | ||
-Dkubernetes.container.image=flink-k8s-app:latest \ | ||
-Dkubernetes.pod-template-file=./pod-template.yaml \ | ||
-Dkubernetes.jobmanager.cpu=0.25 \ | ||
-Dkubernetes.taskmanager.cpu=0.25 \ | ||
-Djobmanager.memory.process.size=1G \ | ||
-Dtaskmanager.memory.process.size=1G \ | ||
-py /main.py | ||
|
||
POD_NAME=$(kubectl get po --no-headers=true | awk '{print $1}') | ||
kubectl wait pods --for=condition=Ready "${POD_NAME}" | ||
kubectl logs -f "${POD_NAME}" | ||
|
||
minikube ssh --native-ssh=false -- 'cat /tmp/flink-kubernetes-application/output/output.json/*' > data/merged_output | ||
sort_and_compare_files data/merged_output data/expected_output.txt | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters