Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…al_tranactions into inc_txns
  • Loading branch information
Tiemo Bang committed Jul 12, 2024
2 parents 520820c + 78f669e commit bdf4d76
Show file tree
Hide file tree
Showing 49 changed files with 2,461 additions and 1,179 deletions.
7 changes: 4 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ Our known dependencies are:
- a Java Virtual Machine (at least Java 19)
- maven
- graphviz
- Cloud and UI
- Cloud
- Python 3
- typescript
- Redpanda or Kafka
- Earthly (https://earthly.dev/get-earthly)
- Web Console
- Bun

Additional dependencies are automatically installed by the Rust,
maven, Python, and typescript build tools.
maven, Python, and TypeScript build tools.

## Contribution Flow

Expand Down
18 changes: 9 additions & 9 deletions crates/pipeline_manager/build.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use change_detection::ChangeDetection;
use static_files::{resource_dir, NpmBuild};
use std::path::{Path, PathBuf};
use std::{env, fs};
use std::{env /* fs */};

// These are touched during the build, so it would re-build every time if we
// don't exclude them from change detection:
Expand All @@ -11,12 +11,12 @@ const EXCLUDE_LIST: [&str; 4] = [
"../../web-console/.next",
"../../web-console/pipeline-manager-",
];
const SVELTEKIT_EXCLUDE_LIST: [&str; 4] = [
"../../web-console-sveltekit/node_modules",
"../../web-console-sveltekit/build",
"../../web-console-sveltekit/.svelte-kit",
"../../web-console-sveltekit/pipeline-manager-",
];
// const SVELTEKIT_EXCLUDE_LIST: [&str; 4] = [
// "../../web-console-sveltekit/node_modules",
// "../../web-console-sveltekit/build",
// "../../web-console-sveltekit/.svelte-kit",
// "../../web-console-sveltekit/pipeline-manager-",
// ];

/// The build script has two modes:
///
Expand Down Expand Up @@ -76,7 +76,7 @@ fn main() {
.unwrap();
}

{
/*{
// sveltekit
if let Ok(webui_out_folder) = env::var("WEBCONSOLE_BUILD_DIR") {
ChangeDetection::path(&webui_out_folder)
Expand Down Expand Up @@ -122,5 +122,5 @@ fn main() {
let _ = resource_dir.with_generated_filename(out_dir.join("v2").join("generated.rs"));
resource_dir.build().expect("SvelteKit app failed to build");
};
}
}*/
}
16 changes: 8 additions & 8 deletions crates/pipeline_manager/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ pub struct ApiDoc;
// `static_files` magic.
include!(concat!(env!("OUT_DIR"), "/generated.rs"));

mod web_v2 {
include!(concat!(env!("OUT_DIR"), "/v2/generated.rs"));
}
// mod web_v2 {
// include!(concat!(env!("OUT_DIR"), "/v2/generated.rs"));
// }

// The scope for all unauthenticated API endpoints
fn public_scope() -> Scope {
Expand All @@ -295,10 +295,10 @@ fn public_scope() -> Scope {
.service(ResourceFiles::new("/", generate()))
}

fn new_scope() -> Scope {
web::scope("/new")
.service(ResourceFiles::new("/", web_v2::generate()).resolve_not_found_to_root())
}
// fn new_scope() -> Scope {
// web::scope("/new")
// .service(ResourceFiles::new("/", web_v2::generate()).resolve_not_found_to_root())
// }

// The scope for all authenticated API endpoints
fn api_scope() -> Scope {
Expand Down Expand Up @@ -464,7 +464,7 @@ pub async fn run(db: Arc<Mutex<ProjectDB>>, api_config: ApiServerConfig) -> AnyR
let req = crate::auth::tag_with_default_tenant_id(req);
srv.call(req)
}))
.service(new_scope())
// .service(new_scope())
.service(public_scope())
});
server.listen(listener)?.run()
Expand Down
3 changes: 3 additions & 0 deletions demo/project_demo12-HopsworksTikTokRecSys/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.venv
venv
__pycache__
67 changes: 67 additions & 0 deletions demo/project_demo12-HopsworksTikTokRecSys/0_prepare_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import json
import pandas as pd
from kafka import KafkaAdminClient, KafkaProducer
from kafka.admin import NewTopic

import config
from features.users import generate_users
from features.videos import generate_video_content
from features.interactions import generate_interactions

def simulate_interactions(step=100, historical=False):
# Generate data for users
user_data = generate_users(config.USERS_AMOUNT_HISTORICAL if historical else config.USERS_AMOUNT_PIPELINE, historical=historical)

# Generate data for videos
video_data = generate_video_content(config.VIDEO_AMOUNT_HISTORICAL if historical else config.VIDEO_AMOUNT_PIPELINE, historical=historical)

# Generate interactions
interactions = generate_interactions(
config.INTERACTIONS_AMOUNT_HISTORICAL if historical else config.INTERACTIONS_AMOUNT_PIPELINE,
user_data,
video_data,
)

for i in range(0, len(interactions), step):
data_interactions_df = pd.DataFrame(interactions[i:i+step])
data_interactions_df['json'] = data_interactions_df.apply(lambda x: x.to_json(), axis=1)
yield [json.loads(i) for i in data_interactions_df.json.values]

def send_interactions(interactions):
counter = 0
for interaction in interactions:
counter += len(interaction)
msg = b"\n".join([json.dumps(v).encode("utf-8") for v in interaction])
producer.send(config.KAFKA_TOPIC_NAME, value=msg)
print(f"Sent {counter} data points to kafka")
producer.flush()

admin_client = KafkaAdminClient(
bootstrap_servers=config.KAFKA_SERVER,
client_id="blah"
)

existing_topics = set(admin_client.list_topics())

if config.KAFKA_TOPIC_NAME in existing_topics:
print("Kafka topic already exists, removing it")
admin_client.delete_topics([config.KAFKA_TOPIC_NAME])

if config.KAFKA_TOPIC_NAME not in existing_topics:
print("Creating a Kafka new topic")
admin_client.create_topics([
NewTopic(config.KAFKA_TOPIC_NAME, num_partitions=2, replication_factor=1)
])

producer = KafkaProducer(
bootstrap_servers=config.KAFKA_SERVER,
client_id="blah",
)

print("Simulating Historical Data")
send_interactions(simulate_interactions(historical=True))

print("Simulating Present Data")
send_interactions(simulate_interactions())


81 changes: 81 additions & 0 deletions demo/project_demo12-HopsworksTikTokRecSys/1_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from feldera import FelderaClient, SQLContext, SQLSchema
from feldera.formats import JSONFormat, JSONUpdateFormat
import config
import time

client = FelderaClient("http://localhost:8080")
sql = SQLContext("mil", client, workers=16)

sql.register_table("interactions",
SQLSchema({
"interaction_id": "BIGINT",
"user_id": "INT",
"video_id": "INT",
"category_id": "INT",
"interaction_type": "STRING",
"watch_time": "INT",
"interaction_date": "TIMESTAMP LATENESS INTERVAL '10' SECONDS",
"previous_interaction_date": "TIMESTAMP",
"interaction_month": "TIMESTAMP",
})
)

sql.register_view("video_agg", """
SELECT
video_id,
interaction_type,
count(*) OVER hour as interaction_len_h,
count(*) OVER day as interaction_len_d,
count(*) OVER week as interaction_len_w,
avg(watch_time) OVER hour as average_watch_time_h,
avg(watch_time) OVER day as average_watch_time_d,
avg(watch_time) OVER week as average_watch_time_w,
interaction_date as hour_start
FROM interactions
WINDOW
hour AS (PARTITION BY video_id ORDER BY interaction_date RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW),
day AS (PARTITION BY video_id ORDER BY interaction_date RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW),
week AS (PARTITION BY video_id ORDER BY interaction_date RANGE BETWEEN INTERVAL '7' DAY PRECEDING AND CURRENT ROW)
""")

sql.register_view("user_agg", """
SELECT
user_id,
interaction_type,
count(*) OVER hour as interaction_len_h,
count(*) OVER day as interaction_len_d,
count(*) OVER week as interaction_len_w,
avg(watch_time) OVER hour as average_watch_time_h,
avg(watch_time) OVER day as average_watch_time_d,
avg(watch_time) OVER week as average_watch_time_w,
interaction_date as hour_start
FROM interactions
WINDOW
hour AS (PARTITION BY user_id ORDER BY interaction_date RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW),
day AS (PARTITION BY user_id ORDER BY interaction_date RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW),
week AS (PARTITION BY user_id ORDER BY interaction_date RANGE BETWEEN INTERVAL '7' DAY PRECEDING AND CURRENT ROW)
""")


in_fmt = JSONFormat().with_array(False).with_update_format(JSONUpdateFormat.Raw)
sql.connect_source_kafka("interactions", "kafka_conn_in_interactions", {
"topics": [config.KAFKA_TOPIC_NAME],
"bootstrap.servers": config.KAFKA_SERVER_FROM_PIPELINE,
"auto.offset.reset": "earliest",
"poller_threads": 8,
}, in_fmt)

print("Starting Feldera Pipeline")
sql.start()
print("Pipeline started")

start_time = time.time()

sql.wait_for_idle(idle_interval_s = 1)

end_time = time.time()
elapsed = end_time - start_time

print(f"Pipeline finished in {elapsed}, shutting down...")

sql.shutdown()
22 changes: 22 additions & 0 deletions demo/project_demo12-HopsworksTikTokRecSys/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# This code is borrowed from the Hopsworks TikTok RecSys Demo

# Original source: https://github.com/davitbzh/tiktok-recsys/blob/main/python/Jupyter/streaming/config.py

USERS_AMOUNT_HISTORICAL = 1_000
VIDEO_AMOUNT_HISTORICAL = 1_000
INTERACTIONS_AMOUNT_HISTORICAL = 50_000_000

USERS_AMOUNT_PIPELINE = 1_000
VIDEO_AMOUNT_PIPELINE = 1_000
INTERACTIONS_AMOUNT_PIPELINE = 1_000_000

DATE_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
DAY_FORMAT = '%Y-%m-%d'
MONTH_FORMAT = '%Y-%m-01 00:00:00'

KAFKA_TOPIC_NAME = "interactions_streaming_test_trial2"
SCHEMA_NAME = "interactions_streaming_test_trial_schema1"

KAFKA_SERVER = "localhost:19092"
#KAFKA_SERVER_FROM_PIPELINE = "redpanda:9092"
KAFKA_SERVER_FROM_PIPELINE = "localhost:19092"
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# This module is borrowed from the Hopsworks TikTok RecSys Demo

# Original Repository: https://github.com/davitbzh/tiktok-recsys
# Original Source: https://github.com/davitbzh/tiktok-recsys/blob/main/python/Jupyter/features

# Modifications made:
# - The fields video_id, user_id, interactions_id now generate integers instead of strings
Loading

0 comments on commit bdf4d76

Please sign in to comment.