diff --git a/_data/harnesses/audit-logs/confluent.yml b/_data/harnesses/audit-logs/confluent.yml index ed83cdcfea..1912ec64dc 100644 --- a/_data/harnesses/audit-logs/confluent.yml +++ b/_data/harnesses/audit-logs/confluent.yml @@ -56,14 +56,14 @@ dev: render: file: tutorials/audit-logs/confluent/markup/dev/sink.adoc - - title: Cleanup + - title: Explanation content: - action: skip render: - file: shared/markup/ccloud/cleanup.adoc + file: tutorials/audit-logs/confluent/markup/dev/explanation.adoc - - title: Explanation + - title: Cleanup content: - action: skip render: - file: tutorials/audit-logs/confluent/markup/dev/explanation.adoc + file: shared/markup/ccloud/cleanup.adoc diff --git a/_data/harnesses/aviation/confluent.yml b/_data/harnesses/aviation/confluent.yml index bced655fb1..8ecbc45182 100644 --- a/_data/harnesses/aviation/confluent.yml +++ b/_data/harnesses/aviation/confluent.yml @@ -54,14 +54,14 @@ dev: render: file: tutorials/aviation/confluent/markup/dev/manual.adoc - - title: Cleanup + - title: Explanation content: - action: skip render: - file: shared/markup/ccloud/cleanup.adoc + file: tutorials/aviation/confluent/markup/dev/explanation.adoc - - title: Explanation + - title: Cleanup content: - action: skip render: - file: tutorials/aviation/confluent/markup/dev/explanation.adoc + file: shared/markup/ccloud/cleanup.adoc diff --git a/_data/harnesses/datacenter/confluent.yml b/_data/harnesses/datacenter/confluent.yml index 9e0be56383..a302d85f81 100644 --- a/_data/harnesses/datacenter/confluent.yml +++ b/_data/harnesses/datacenter/confluent.yml @@ -54,14 +54,14 @@ dev: render: file: tutorials/datacenter/confluent/markup/dev/manual.adoc - - title: Cleanup + - title: Explanation content: - action: skip render: - file: shared/markup/ccloud/cleanup.adoc + file: tutorials/datacenter/confluent/markup/dev/explanation.adoc - - title: Explanation + - title: Cleanup content: - action: skip render: - file: tutorials/datacenter/confluent/markup/dev/explanation.adoc + file: shared/markup/ccloud/cleanup.adoc diff --git a/_data/harnesses/ddos/confluent.yml b/_data/harnesses/ddos/confluent.yml index 47b9929e1e..46b2ad720d 100644 --- a/_data/harnesses/ddos/confluent.yml +++ b/_data/harnesses/ddos/confluent.yml @@ -54,14 +54,14 @@ dev: render: file: tutorials/ddos/confluent/markup/dev/manual.adoc - - title: Cleanup + - title: Explanation content: - action: skip render: - file: shared/markup/ccloud/cleanup.adoc + file: tutorials/ddos/confluent/markup/dev/explanation.adoc - - title: Explanation + - title: Cleanup content: - action: skip render: - file: tutorials/ddos/confluent/markup/dev/explanation.adoc + file: shared/markup/ccloud/cleanup.adoc diff --git a/_data/harnesses/discount-promo/confluent.yml b/_data/harnesses/discount-promo/confluent.yml index a78c036224..7d06fb14fe 100644 --- a/_data/harnesses/discount-promo/confluent.yml +++ b/_data/harnesses/discount-promo/confluent.yml @@ -54,14 +54,14 @@ dev: render: file: tutorials/discount-promo/confluent/markup/dev/manual.adoc - - title: Cleanup + - title: Explanation content: - action: skip render: - file: shared/markup/ccloud/cleanup.adoc + file: tutorials/discount-promo/confluent/markup/dev/explanation.adoc - - title: Explanation + - title: Cleanup content: - action: skip render: - file: tutorials/discount-promo/confluent/markup/dev/explanation.adoc + file: shared/markup/ccloud/cleanup.adoc diff --git a/_data/harnesses/location-based-alerting/confluent.yml b/_data/harnesses/location-based-alerting/confluent.yml new file mode 100644 index 0000000000..dbf1acc8ad --- /dev/null +++ b/_data/harnesses/location-based-alerting/confluent.yml @@ -0,0 +1,67 @@ +answer: + steps: + - title: + content: + - action: skip + render: + file: tutorials/location-based-alerting/confluent/markup/dev/answer-short.adoc + +dev: + steps: + - title: Setup your environment + content: + - action: skip + render: + file: shared/markup/ccloud/provision-cluster.adoc + + - action: skip + render: + file: shared/markup/ccloud/ccloud_setup.adoc + + - title: Read the data in + content: + - action: skip + render: + file: shared/markup/ccloud/connect.adoc + + - action: skip + render: + file: tutorials/location-based-alerting/confluent/markup/dev/source.adoc + + - action: skip + render: + file: shared/markup/ccloud/manual_insert.adoc + + - title: ksqlDB code + content: + - action: skip + render: + file: tutorials/location-based-alerting/confluent/markup/dev/ksqlDB.adoc + + - action: skip + render: + file: shared/markup/ccloud/ksqlb_processing_intro.adoc + + - action: skip + render: + file: tutorials/location-based-alerting/confluent/markup/dev/process.adoc + + - action: skip + render: + file: shared/markup/ccloud/manual_cue.adoc + + - action: skip + render: + file: tutorials/location-based-alerting/confluent/markup/dev/manual.adoc + + - title: Write the data out + content: + - action: skip + render: + file: tutorials/location-based-alerting/confluent/markup/dev/sink.adoc + + - title: Cleanup + content: + - action: skip + render: + file: shared/markup/ccloud/cleanup.adoc diff --git a/_data/harnesses/loyalty-rewards/confluent.yml b/_data/harnesses/loyalty-rewards/confluent.yml index 966e2fa8c5..4d500afbd5 100644 --- a/_data/harnesses/loyalty-rewards/confluent.yml +++ b/_data/harnesses/loyalty-rewards/confluent.yml @@ -54,14 +54,14 @@ dev: render: file: tutorials/loyalty-rewards/confluent/markup/dev/manual.adoc - - title: Cleanup + - title: Explanation content: - action: skip render: - file: shared/markup/ccloud/cleanup.adoc + file: tutorials/loyalty-rewards/confluent/markup/dev/explanation.adoc - - title: Explanation + - title: Cleanup content: - action: skip render: - file: tutorials/loyalty-rewards/confluent/markup/dev/explanation.adoc + file: shared/markup/ccloud/cleanup.adoc diff --git a/_data/harnesses/model-retraining/confluent.yml b/_data/harnesses/model-retraining/confluent.yml new file mode 100644 index 0000000000..6f3ce91661 --- /dev/null +++ b/_data/harnesses/model-retraining/confluent.yml @@ -0,0 +1,67 @@ +answer: + steps: + - title: + content: + - action: skip + render: + file: tutorials/model-retraining/confluent/markup/dev/answer-short.adoc + +dev: + steps: + - title: Setup your environment + content: + - action: skip + render: + file: shared/markup/ccloud/provision-cluster.adoc + + - action: skip + render: + file: shared/markup/ccloud/ccloud_setup.adoc + + - title: Read the data in + content: + - action: skip + render: + file: shared/markup/ccloud/connect.adoc + + - action: skip + render: + file: tutorials/model-retraining/confluent/markup/dev/source.adoc + + - action: skip + render: + file: shared/markup/ccloud/manual_insert.adoc + + - title: ksqlDB code + content: + - action: skip + render: + file: tutorials/model-retraining/confluent/markup/dev/ksqlDB.adoc + + - action: skip + render: + file: shared/markup/ccloud/ksqlb_processing_intro.adoc + + - action: skip + render: + file: tutorials/model-retraining/confluent/markup/dev/process.adoc + + - action: skip + render: + file: shared/markup/ccloud/manual_cue.adoc + + - action: skip + render: + file: tutorials/model-retraining/confluent/markup/dev/manual.adoc + + - title: Write the data out + content: + - action: skip + render: + file: tutorials/model-retraining/confluent/markup/dev/sink.adoc + + - title: Cleanup + content: + - action: skip + render: + file: shared/markup/ccloud/cleanup.adoc diff --git a/_data/harnesses/next-best-offer/confluent.yml b/_data/harnesses/next-best-offer/confluent.yml index 4b5ad3c1a9..8af6067726 100644 --- a/_data/harnesses/next-best-offer/confluent.yml +++ b/_data/harnesses/next-best-offer/confluent.yml @@ -54,14 +54,14 @@ dev: render: file: tutorials/next-best-offer/confluent/markup/dev/manual.adoc - - title: Cleanup + - title: Explanation content: - action: skip render: - file: shared/markup/ccloud/cleanup.adoc + file: tutorials/next-best-offer/confluent/markup/dev/explanation.adoc - - title: Explanation + - title: Cleanup content: - action: skip render: - file: tutorials/next-best-offer/confluent/markup/dev/explanation.adoc + file: shared/markup/ccloud/cleanup.adoc diff --git a/_data/harnesses/online-dating/confluent.yml b/_data/harnesses/online-dating/confluent.yml index fd44998ae4..2853aecb4a 100644 --- a/_data/harnesses/online-dating/confluent.yml +++ b/_data/harnesses/online-dating/confluent.yml @@ -54,14 +54,14 @@ dev: render: file: tutorials/online-dating/confluent/markup/dev/manual.adoc - - title: Cleanup + - title: Explanation content: - action: skip render: - file: shared/markup/ccloud/cleanup.adoc + file: tutorials/online-dating/confluent/markup/dev/explanation.adoc - - title: Explanation + - title: Cleanup content: - action: skip render: - file: tutorials/online-dating/confluent/markup/dev/explanation.adoc + file: shared/markup/ccloud/cleanup.adoc diff --git a/_data/harnesses/salesforce/confluent.yml b/_data/harnesses/salesforce/confluent.yml index d02970f379..3e64290af7 100644 --- a/_data/harnesses/salesforce/confluent.yml +++ b/_data/harnesses/salesforce/confluent.yml @@ -28,6 +28,10 @@ dev: render: file: tutorials/salesforce/confluent/markup/dev/source.adoc + - action: skip + render: + file: shared/markup/ccloud/manual_insert.adoc + - title: ksqlDB code content: - action: skip @@ -42,6 +46,14 @@ dev: render: file: tutorials/salesforce/confluent/markup/dev/process.adoc + - action: skip + render: + file: shared/markup/ccloud/manual_cue.adoc + + - action: skip + render: + file: tutorials/salesforce/confluent/markup/dev/manual.adoc + - title: Cleanup content: - action: skip diff --git a/_data/tutorials.yaml b/_data/tutorials.yaml index 1161a16af9..516c12baed 100644 --- a/_data/tutorials.yaml +++ b/_data/tutorials.yaml @@ -30,7 +30,8 @@ splitting: canonical: confluent slug: "/split-a-stream-of-events-into-substreams" question: "How do you split events in a Kafka topic so that the events are placed into subtopics?" - introduction: "Suppose that you have a Kafka topic representing appearances of an actor or actress in a film, with each event denoting the genre. In this tutorial, we'll write a program that splits the stream into substreams based on the genre. We'll have a topic for drama films, a topic for fantasy films, and a topic for everything else." + introduction: "Suppose that you have a Kafka topic representing appearances of an actor or actress in a film, with each event denoting the genre. In this tutorial, we'll write a program that splits the stream into substreams based on the genre. We'll have a topic for drama films, a topic for fantasy films, and a topic for everything else. Related pattern: Event Router." + introduction-media: "https://raw.githubusercontent.com/confluentinc/event-streaming-patterns/main/docs/img/event-router.svg" status: ksql: enabled kstreams: enabled @@ -43,7 +44,8 @@ merging: canonical: confluent slug: "/merge-many-streams-into-one-stream" question: "If you have many Kafka topics with events, how do you merge them all into a single topic?" - introduction: "Suppose that you have a set of Kafka topics representing songs of a particular genre being played. You might have a topic for rock songs, another for classical songs, and so forth. In this tutorial, we'll write a program that merges all of the song play events into a single topic." + introduction: "Suppose that you have a set of Kafka topics representing songs of a particular genre being played. You might have a topic for rock songs, another for classical songs, and so forth. In this tutorial, we'll write a program that merges all of the song play events into a single topic. Related pattern: Event Stream Merger." + introduction-media: "https://raw.githubusercontent.com/confluentinc/event-streaming-patterns/main/docs/img/event-stream-merger.svg" status: ksql: enabled kstreams: enabled @@ -56,7 +58,8 @@ joining-stream-table: canonical: confluent slug: "/join-a-stream-to-a-table" question: "If you have events in a Kafka topic and a table of reference data (also known as a lookup table), how can you join each event in the stream to a piece of data in the table based on a common key?" - introduction: "Suppose you have a set of movies that have been released and a stream of ratings from moviegoers about how entertaining they are. In this tutorial, we'll write a program that joins each rating with content about the movie." + introduction: "Suppose you have a set of movies that have been released and a stream of ratings from moviegoers about how entertaining they are. In this tutorial, we'll write a program that joins each rating with content about the movie. Related pattern: Event Joiner" + introduction-media: "https://raw.githubusercontent.com/confluentinc/event-streaming-patterns/main/docs/img/event-joiner.svg" status: ksql: enabled kstreams: enabled @@ -280,7 +283,8 @@ dynamic-output-topic: canonical: confluent slug: "/dynamic-output-topic" question: "How can you dynamically route records to different Kafka topics, like a \"topic exchange\"?" - introduction: "Consider a situation where you want to direct the output of different records to different topics, like a \"topic exchange.\" In this tutorial, you'll learn how to instruct Kafka Streams to choose the output topic at runtime, based on information in each record's header, key, or value." + introduction: "Consider a situation where you want to direct the output of different records to different topics, like a \"topic exchange.\" In this tutorial, you'll learn how to instruct Kafka Streams to choose the output topic at runtime, based on information in each record's header, key, or value. Related pattern: Event Router." + introduction-media: "https://raw.githubusercontent.com/confluentinc/event-streaming-patterns/main/docs/img/event-router.svg" status: ksql: disabled kstreams: enabled @@ -318,6 +322,7 @@ console-consumer-producer-basic: slug: "/kafka-console-consumer-producer-basics" question: "What is the simplest way to write messages to and read messages from Kafka?" introduction: "So you are excited to get started with Kafka, and you'd like to produce and consume some basic messages, quickly. In this tutorial, we'll show you how to produce and consume messages from the command line without any code." + introduction-media: "/assets/img/cli.png" status: ksql: disabled kstreams: disabled @@ -331,6 +336,7 @@ console-consumer-producer-avro: slug: "/kafka-console-consumer-producer-avro" question: "What is the simplest way to write messages to and read messages from Kafka, using (de)serializers and Schema Registry?" introduction: "You'd like to produce and consume some basic messages, using (de)serializers and Schema Registry. In this tutorial, we'll show you how to produce and consume messages from the command line without any code. Unlike the CLI Basics tutorial, this tutorial uses Avro and Schema Registry." + introduction-media: "/assets/img/sr.png" status: ksql: disabled kstreams: disabled @@ -780,6 +786,7 @@ creating-first-apache-kafka-streams-application: slug: "/creating-first-apache-kafka-streams-application" question: "How do you get started building your first Kafka Streams application?" introduction: "You'd like to get started with Kafka Streams, but you're not sure where to start. In this tutorial, you'll build a small stream processing application and produce some sample data to test it. After you complete this tutorial, you can go more in depth in the Kafka Streams 101 course." + introduction-media: "/assets/img/streams-app.png" status: confluent: enabled kstreams: enabled @@ -903,7 +910,7 @@ salesforce: title: "Handle corrupted data from Salesforce" meta-description: "This ksqlDB tutorial streams changes of Salesforce records and identifies gap events." slug: "/salesforce" - introduction: "Salesforce sends a notification when a change to a Salesforce record occurs as part of a create, update, delete, or undelete operation. However, if there is corrupt data in Salesforce, it sends a gap event instead of a change event, and these gap events should be properly handled to avoid discrepancies between Salesforce reports and internal dashboards. This tutorial demonstrates how to process Salesforce data and filter corrupt events, which allows a downstream application to appropriately process and reconcile those events for accurate reporting and analytics." + introduction: "Salesforce sends a notification when a change to a Salesforce record occurs as part of a create, update, delete, or undelete operation. However, if there is corrupt data in Salesforce, it sends a gap event instead of a change event, and these gap events should be properly handled to avoid discrepancies between Salesforce reports and internal dashboards. This tutorial demonstrates how to process Salesforce data and filter corrupt events, which allows a downstream application to appropriately process and reconcile those events for accurate reporting and analytics. For a more detailed explanation of this use case, read Streaming ETL SFDC Data for Real-Time Customer Analytics." introduction-media: "/assets/img/salesforce.jpg" tutorial-id: "10" status: @@ -1049,3 +1056,21 @@ audit-logs: tutorial-id: "23" status: confluent: enabled + +model-retraining: + title: "Machine Learning Model Retraining" + meta-description: "This recipe demonstrates how to use ksqlDB to evaluate the predictions of a machine learning model and send data to retrain the model when needed." + slug: "/model-retraining" + introduction: "Machine learning provides valuable insights to an organization, and tools like Apache Kafka, Kafka Connect, and ksqlDB allow us to build powerful machine learning pipelines. We can also use these tools to extend an existing machine learning pipeline. In this recipe, we'll use Connect and ksqlDB to read the results of an existing pipeline, determine the accuracy of those results, and send data to retrain our model. This recipe is based on the excellent blog post Apache Kafka and R: Real-Time Prediction and Model (Re)training, by Patrick Neff." + tutorial-id: "24" + status: + confluent: enabled + +location-based-alerting: + title: "Geolocation-Based Alerting" + meta-description: "This ksqlDB tutorial demonstrates how to create real-time, personalized, location-based alerts. Merchant data and user location events are joined to generate alerts when a user passes close to a participating merchant." + slug: "/location-based-alerting" + introduction: "Customers are no longer satisfied with using boring static websites to purchase your product or consume your service. Users demand interactive and contextualized real-time mobile applications. Providing customers with rich, real-time experiences is fundamental, and this recipe shows how ksqlDB can help to build personalized, location-based alerts in real time with user-provided mobile geolocation data." + tutorial-id: "25" + status: + confluent: enabled diff --git a/_includes/shared/markup/ccloud/ccloud-setup-self.adoc b/_includes/shared/markup/ccloud/ccloud-setup-self.adoc index 5abbedeef6..d6d3b0dfd6 100644 --- a/_includes/shared/markup/ccloud/ccloud-setup-self.adoc +++ b/_includes/shared/markup/ccloud/ccloud-setup-self.adoc @@ -1,5 +1,8 @@ This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on https://www.confluent.io/confluent-cloud/tryfree/[Confluent Cloud], which provides Kafka as a fully managed service. -First, sign up for https://www.confluent.io/confluent-cloud/tryfree/[Confluent Cloud]. + ++++++ +Take me to Confluent Cloud ++++++ 1. After you log in to https://www.confluent.io/confluent-cloud/tryfree/[Confluent Cloud], click on `Add cloud environment` and name the environment `learn-kafka`. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources. diff --git a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c01.sql b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c01.sql index 2d4da88884..173a1f34dd 100644 --- a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c01.sql +++ b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c01.sql @@ -1,9 +1,12 @@ -CREATE TABLE customers (ID INT PRIMARY KEY - , NAME VARCHAR - , ADDRESS VARCHAR - , EMAIL VARCHAR - , PHONE VARCHAR - , LOYALTY_STATUS VARCHAR) - WITH (KAFKA_TOPIC='customers' - , FORMAT='AVRO' - , PARTITIONS=6); +CREATE TABLE customers ( + id INT PRIMARY KEY, + name VARCHAR, + address VARCHAR, + email VARCHAR, + phone VARCHAR, + loyalty_status VARCHAR +) WITH ( + KAFKA_TOPIC = 'customers', + FORMAT = 'JSON', + PARTITIONS = 6 +); diff --git a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c02.sql b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c02.sql index a410dbe080..1c46405d61 100644 --- a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c02.sql +++ b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c02.sql @@ -1,16 +1,22 @@ -CREATE TABLE flights (ID INT PRIMARY KEY - , ORIGIN VARCHAR - , DESTINATION VARCHAR - , CODE VARCHAR - , SCHEDULED_DEP TIMESTAMP - , SCHEDULED_ARR TIMESTAMP) - WITH (KAFKA_TOPIC='flights' - , FORMAT='AVRO' - , PARTITIONS=6); +CREATE TABLE flights ( + id INT PRIMARY KEY, + origin VARCHAR, + destination VARCHAR, + code VARCHAR, + scheduled_dep TIMESTAMP, + scheduled_arr TIMESTAMP +) WITH ( + KAFKA_TOPIC = 'flights', + FORMAT = 'JSON', + PARTITIONS = 6 +); -CREATE TABLE bookings (ID INT PRIMARY KEY - , CUSTOMER_ID INT - , FLIGHT_ID INT) - WITH (KAFKA_TOPIC='bookings' - , FORMAT='AVRO' - , PARTITIONS=6); +CREATE TABLE bookings ( + id INT PRIMARY KEY, + customer_id INT, + flight_id INT +) WITH ( + KAFKA_TOPIC = 'bookings', + FORMAT = 'JSON', + PARTITIONS = 6 +); diff --git a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c03.sql b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c03.sql index 04690581b2..0308d7eef1 100644 --- a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c03.sql +++ b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/c03.sql @@ -1,8 +1,10 @@ -CREATE STREAM flight_updates (ID INT KEY - , FLIGHT_ID INT - , UPDATED_DEP TIMESTAMP - , REASON VARCHAR - ) - WITH (KAFKA_TOPIC='flight_updates' - , FORMAT='AVRO' - , PARTITIONS=6); +CREATE STREAM flight_updates ( + id INT KEY, + flight_id INT, + updated_dep TIMESTAMP, + reason VARCHAR +) WITH ( + KAFKA_TOPIC = 'flight_updates', + FORMAT = 'JSON', + PARTITIONS = 6 +); diff --git a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/j01.sql b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/j01.sql index d33b2cdb0a..282338ba01 100644 --- a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/j01.sql +++ b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/j01.sql @@ -1,5 +1,7 @@ -CREATE TABLE customer_bookings AS - SELECT C.*, B.ID, B.FLIGHT_ID - FROM bookings B - INNER JOIN customers C - ON B.CUSTOMER_ID = C.ID; +CREATE TABLE customer_bookings WITH (KAFKA_TOPIC = 'customer_bookings', KEY_FORMAT = 'KAFKA', VALUE_FORMAT = 'JSON') AS + SELECT C.*, + B.id, + B.flight_id + FROM bookings B + INNER JOIN customers C + ON B.customer_id = C.id; diff --git a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/j02.sql b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/j02.sql index 4f1ea645c9..48d7b0d610 100644 --- a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/j02.sql +++ b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/j02.sql @@ -1,6 +1,6 @@ -CREATE TABLE customer_flights - WITH (KAFKA_TOPIC='customer_flights') AS - SELECT CB.*, F.* - FROM customer_bookings CB - INNER JOIN flights F - ON CB.FLIGHT_ID=F.ID; +CREATE TABLE customer_flights WITH (KAFKA_TOPIC = 'customer_flights', KEY_FORMAT = 'KAFKA', VALUE_FORMAT = 'JSON') AS + SELECT CB.*, + F.* + FROM customer_bookings CB + INNER JOIN flights F + ON CB.flight_id = F.id; diff --git a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/manual.sql b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/manual.sql index 3e86dc90ab..6493ccf0de 100644 --- a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/manual.sql +++ b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/manual.sql @@ -1,20 +1,20 @@ -INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (1, 'Gleda Lealle', '93 Express Point', 'glealle0@senate.gov', '+351 831 301 6746', 'Silver'); -INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (2, 'Gilly Crocombe', '332 Blaine Avenue', 'gcrocombe1@homestead.com', '+33 203 565 3736', 'Silver'); -INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (3, 'Astrix Aspall', '56 Randy Place', 'aaspall2@ebay.co.uk', '+33 679 296 6645', 'Gold'); -INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (4, 'Ker Omond', '23255 Tennessee Court', 'komond3@usnews.com', '+33 515 323 0170', 'Silver'); -INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (5, 'Arline Synnott', '144 Ramsey Avenue', 'asynnott4@theatlantic.com', '+62 953 759 8885', 'Bronze'); +INSERT INTO customers (id, name, address, email, phone, loyalty_status) VALUES (1, 'Gleda Lealle', '93 Express Point', 'glealle0@senate.gov', '+351 831 301 6746', 'Silver'); +INSERT INTO customers (id, name, address, email, phone, loyalty_status) VALUES (2, 'Gilly Crocombe', '332 Blaine Avenue', 'gcrocombe1@homestead.com', '+33 203 565 3736', 'Silver'); +INSERT INTO customers (id, name, address, email, phone, loyalty_status) VALUES (3, 'Astrix Aspall', '56 Randy Place', 'aaspall2@ebay.co.uk', '+33 679 296 6645', 'Gold'); +INSERT INTO customers (id, name, address, email, phone, loyalty_status) VALUES (4, 'Ker Omond', '23255 Tennessee Court', 'komond3@usnews.com', '+33 515 323 0170', 'Silver'); +INSERT INTO customers (id, name, address, email, phone, loyalty_status) VALUES (5, 'Arline Synnott', '144 Ramsey Avenue', 'asynnott4@theatlantic.com', '+62 953 759 8885', 'Bronze'); -INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (1, 'LBA', 'AMS', '642', '2021-11-18T06:04:00', '2021-11-18T06:48:00'); -INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (2, 'LBA', 'LHR', '9607', '2021-11-18T07:36:00', '2021-11-18T08:05:00'); -INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (3, 'AMS', 'TXL', '7968', '2021-11-18T08:11:00', '2021-11-18T10:41:00'); -INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (4, 'AMS', 'OSL', '496', '2021-11-18T11:20:00', '2021-11-18T13:25:00'); -INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (5, 'LHR', 'JFK', '9230', '2021-11-18T10:36:00', '2021-11-18T19:07:00'); +INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (1, 'LBA', 'AMS', '642', '2021-11-18T06:04:00', '2021-11-18T06:48:00'); +INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (2, 'LBA', 'LHR', '9607', '2021-11-18T07:36:00', '2021-11-18T08:05:00'); +INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (3, 'AMS', 'TXL', '7968', '2021-11-18T08:11:00', '2021-11-18T10:41:00'); +INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (4, 'AMS', 'OSL', '496', '2021-11-18T11:20:00', '2021-11-18T13:25:00'); +INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (5, 'LHR', 'JFK', '9230', '2021-11-18T10:36:00', '2021-11-18T19:07:00'); -INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (1,2,1); -INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (2,1,1); -INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (3,5,3); -INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (4,4,2); +INSERT INTO bookings (id, customer_id, flight_id) VALUES (1,2,1); +INSERT INTO bookings (id, customer_id, flight_id) VALUES (2,1,1); +INSERT INTO bookings (id, customer_id, flight_id) VALUES (3,5,3); +INSERT INTO bookings (id, customer_id, flight_id) VALUES (4,4,2); -INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (1, 2, '2021-11-18T09:00:00.000', 'Cabin staff unavailable'); -INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (2, 3, '2021-11-19T14:00:00.000', 'Mechanical checks'); -INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (3, 1, '2021-11-19T08:10:09.000', 'Icy conditions'); +INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (1, 2, '2021-11-18T09:00:00.000', 'Cabin staff unavailable'); +INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (2, 3, '2021-11-19T14:00:00.000', 'Mechanical checks'); +INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (3, 1, '2021-11-19T08:10:09.000', 'Icy conditions'); diff --git a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/o01.sql b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/o01.sql index d1ba61d77d..cbbce015a9 100644 --- a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/o01.sql +++ b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/o01.sql @@ -1 +1 @@ -SET 'auto.offset.reset' = 'earliest'; \ No newline at end of file +SET 'auto.offset.reset' = 'earliest'; diff --git a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/p01.sql b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/p01.sql index ea7d1f71b7..85afa1d7ca 100644 --- a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/p01.sql +++ b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/p01.sql @@ -1,12 +1,12 @@ -SELECT CUSTOMER_NAME - , FU.REASON AS FLIGHT_CHANGE_REASON - , FU.UPDATED_DEP AS FLIGHT_UPDATED_DEP - , FLIGHT_SCHEDULED_DEP - , CUSTOMER_EMAIL - , CUSTOMER_PHONE - , FLIGHT_DESTINATION - , FLIGHT_CODE +SELECT customer_name, + FU.reason AS flight_change_reason, + FU.updated_dep AS flight_updated_dep, + flight_scheduled_dep, + customer_email, + customer_phone, + flight_destination, + flight_code FROM flight_updates FU INNER JOIN customer_flights_rekeyed CB - ON FU.FLIGHT_ID = CB.FLIGHT_ID + ON FU.flight_id = CB.flight_id EMIT CHANGES; diff --git a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/process.sql b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/process.sql index b87a13eb79..836ef7dedb 100644 --- a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/process.sql +++ b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/process.sql @@ -1,90 +1,137 @@ SET 'auto.offset.reset' = 'earliest'; -CREATE TABLE customers (ID INT PRIMARY KEY - , NAME VARCHAR - , ADDRESS VARCHAR - , EMAIL VARCHAR - , PHONE VARCHAR - , LOYALTY_STATUS VARCHAR) - WITH (KAFKA_TOPIC = 'customers' - , FORMAT = 'AVRO' - , PARTITIONS = 6 +CREATE TABLE customers ( + id INT PRIMARY KEY, + name VARCHAR, + address VARCHAR, + email VARCHAR, + phone VARCHAR, + loyalty_status VARCHAR +) WITH ( + KAFKA_TOPIC = 'customers', + FORMAT = 'JSON', + PARTITIONS = 6 ); -CREATE TABLE flights (ID INT PRIMARY KEY - , ORIGIN VARCHAR - , DESTINATION VARCHAR - , CODE VARCHAR - , SCHEDULED_DEP TIMESTAMP - , SCHEDULED_ARR TIMESTAMP) - WITH (KAFKA_TOPIC = 'flights' - , FORMAT = 'AVRO' - , PARTITIONS = 6 +CREATE TABLE flights ( + id INT PRIMARY KEY, + origin VARCHAR, + destination VARCHAR, + code VARCHAR, + scheduled_dep TIMESTAMP, + scheduled_arr TIMESTAMP +) WITH ( + KAFKA_TOPIC = 'flights', + FORMAT = 'JSON', + PARTITIONS = 6 ); -CREATE TABLE bookings (ID INT PRIMARY KEY - , CUSTOMER_ID INT - , FLIGHT_ID INT) - WITH (KAFKA_TOPIC = 'bookings' - , FORMAT = 'AVRO' - , PARTITIONS = 6 +CREATE TABLE bookings ( + id INT PRIMARY KEY, + customer_id INT, + flight_id INT +) WITH ( + KAFKA_TOPIC = 'bookings', + FORMAT = 'JSON', + PARTITIONS = 6 ); -CREATE TABLE customer_bookings AS - SELECT C.*, B.ID, B.FLIGHT_ID - FROM bookings B - INNER JOIN customers C - ON B.CUSTOMER_ID = C.ID; +CREATE TABLE customer_bookings WITH (KAFKA_TOPIC = 'customer_bookings', KEY_FORMAT = 'KAFKA', VALUE_FORMAT = 'JSON') AS + SELECT C.*, + B.id, + B.flight_id + FROM bookings B + INNER JOIN customers C + ON B.customer_id = C.id; -CREATE TABLE customer_flights - WITH (KAFKA_TOPIC = 'customer_flights') AS - SELECT CB.*, F.* - FROM customer_bookings CB - INNER JOIN flights F - ON CB.FLIGHT_ID = F.ID; +CREATE TABLE customer_flights WITH (KAFKA_TOPIC = 'customer_flights', KEY_FORMAT = 'KAFKA', VALUE_FORMAT = 'JSON') AS + SELECT CB.*, + F.* + FROM customer_bookings CB + INNER JOIN flights F + ON CB.flight_id = F.id; -CREATE STREAM cf_stream WITH (KAFKA_TOPIC = 'customer_flights', FORMAT = 'AVRO'); +-- In preparation for joining customer flights with flight updates, need to first +-- rekey the customer_flights table by flight ID, which is currently a multi-step +-- process +CREATE STREAM cf_stream ( + cb_c_id INTEGER, + cb_c_name VARCHAR, + cb_c_address VARCHAR, + cb_c_email VARCHAR, + cb_c_phone VARCHAR, + cb_c_loyalty_status VARCHAR, + cb_flight_id INTEGER, + f_id INTEGER, + f_origin VARCHAR, + f_destination VARCHAR, + f_code VARCHAR, + f_scheduled_dep TIMESTAMP, + f_scheduled_arr TIMESTAMP +) WITH ( + KAFKA_TOPIC = 'customer_flights', + KEY_FORMAT = 'KAFKA', + VALUE_FORMAT = 'JSON' +); CREATE STREAM cf_rekey WITH (KAFKA_TOPIC = 'cf_rekey') AS - SELECT F_ID AS FLIGHT_ID - , CB_C_ID AS CUSTOMER_ID - , CB_C_NAME AS CUSTOMER_NAME - , CB_C_ADDRESS AS CUSTOMER_ADDRESS - , CB_C_EMAIL AS CUSTOMER_EMAIL - , CB_C_PHONE AS CUSTOMER_PHONE - , CB_C_LOYALTY_STATUS AS CUSTOMER_LOYALTY_STATUS - , F_ORIGIN AS FLIGHT_ORIGIN - , F_DESTINATION AS FLIGHT_DESTINATION - , F_CODE AS FLIGHT_CODE - , F_SCHEDULED_DEP AS FLIGHT_SCHEDULED_DEP - , F_SCHEDULED_ARR AS FLIGHT_SCHEDULED_ARR + SELECT f_id AS flight_id, + cb_c_id AS customer_id, + cb_c_name AS customer_name, + cb_c_address AS customer_address, + cb_c_email AS customer_email, + cb_c_phone AS customer_phone, + cb_c_loyalty_status AS customer_loyalty_status, + f_origin AS flight_origin, + f_destination AS flight_destination, + f_code AS flight_code, + f_scheduled_dep AS flight_scheduled_dep, + f_scheduled_arr AS flight_scheduled_arr FROM cf_stream - PARTITION BY F_ID; + PARTITION BY f_id; -CREATE TABLE customer_flights_rekeyed - (FLIGHT_ID INT PRIMARY KEY) - WITH (KAFKA_TOPIC = 'cf_rekey', FORMAT = 'AVRO'); +CREATE TABLE customer_flights_rekeyed ( + flight_id INT PRIMARY KEY, + customer_id VARCHAR, + customer_name VARCHAR, + customer_address VARCHAR, + customer_email VARCHAR, + customer_phone VARCHAR, + customer_loyalty_status VARCHAR, + flight_origin VARCHAR, + flight_destination VARCHAR, + flight_code VARCHAR, + flight_scheduled_dep TIMESTAMP, + flight_scheduled_arr TIMESTAMP +) WITH ( + KAFKA_TOPIC = 'cf_rekey', + KEY_FORMAT = 'KAFKA', + VALUE_FORMAT = 'JSON' +); -CREATE STREAM flight_updates (ID INT KEY - , FLIGHT_ID INT - , UPDATED_DEP TIMESTAMP - , REASON VARCHAR - ) - WITH (KAFKA_TOPIC = 'flight_updates' - , FORMAT = 'AVRO' - , PARTITIONS = 6 +CREATE STREAM flight_updates ( + id INT KEY, + flight_id INT, + updated_dep TIMESTAMP, + reason VARCHAR +) WITH ( + KAFKA_TOPIC = 'flight_updates', + KEY_FORMAT = 'KAFKA', + VALUE_FORMAT = 'JSON', + PARTITIONS = 6 ); -CREATE STREAM customer_flight_updates AS - SELECT CUSTOMER_NAME - , FU.REASON AS FLIGHT_CHANGE_REASON - , FU.UPDATED_DEP AS FLIGHT_UPDATED_DEP - , FLIGHT_SCHEDULED_DEP - , CUSTOMER_EMAIL - , CUSTOMER_PHONE - , FLIGHT_DESTINATION - , FLIGHT_CODE - FROM flight_updates FU - INNER JOIN customer_flights_rekeyed CB - ON FU.FLIGHT_ID = CB.FLIGHT_ID - EMIT CHANGES; +CREATE STREAM customer_flight_updates WITH (KAFKA_TOPIC = 'customer_flight_updates') AS +SELECT CB.flight_id, + customer_name, + FU.reason AS flight_change_reason, + FU.updated_dep AS flight_updated_dep, + flight_scheduled_dep, + customer_email, + customer_phone, + flight_destination, + flight_code +FROM flight_updates FU + INNER JOIN customer_flights_rekeyed CB + ON FU.flight_id = CB.flight_id +EMIT CHANGES; diff --git a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/r01.sql b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/r01.sql index 3a8b883dc4..796cee12a7 100644 --- a/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/r01.sql +++ b/_includes/tutorials/aviation/confluent/code/tutorial-steps/dev/r01.sql @@ -1,21 +1,54 @@ -CREATE STREAM cf_stream WITH (KAFKA_TOPIC='customer_flights', FORMAT='AVRO'); +CREATE STREAM cf_stream ( + cb_c_id INTEGER, + cb_c_name VARCHAR, + cb_c_address VARCHAR, + cb_c_email VARCHAR, + cb_c_phone VARCHAR, + cb_c_loyalty_status VARCHAR, + cb_flight_id INTEGER, + f_id INTEGER, + f_origin VARCHAR, + f_destination VARCHAR, + f_code VARCHAR, + f_scheduled_dep TIMESTAMP, + f_scheduled_arr TIMESTAMP +) WITH ( + KAFKA_TOPIC = 'customer_flights', + KEY_FORMAT = 'KAFKA', + VALUE_FORMAT = 'JSON' +); -CREATE STREAM cf_rekey WITH (KAFKA_TOPIC='cf_rekey') AS - SELECT F_ID AS FLIGHT_ID - , CB_C_ID AS CUSTOMER_ID - , CB_C_NAME AS CUSTOMER_NAME - , CB_C_ADDRESS AS CUSTOMER_ADDRESS - , CB_C_EMAIL AS CUSTOMER_EMAIL - , CB_C_PHONE AS CUSTOMER_PHONE - , CB_C_LOYALTY_STATUS AS CUSTOMER_LOYALTY_STATUS - , F_ORIGIN AS FLIGHT_ORIGIN - , F_DESTINATION AS FLIGHT_DESTINATION - , F_CODE AS FLIGHT_CODE - , F_SCHEDULED_DEP AS FLIGHT_SCHEDULED_DEP - , F_SCHEDULED_ARR AS FLIGHT_SCHEDULED_ARR +CREATE STREAM cf_rekey WITH (KAFKA_TOPIC = 'cf_rekey') AS + SELECT f_id AS flight_id, + cb_c_id AS customer_id, + cb_c_name AS customer_name, + cb_c_address AS customer_address, + cb_c_email AS customer_email, + cb_c_phone AS customer_phone, + cb_c_loyalty_status AS customer_loyalty_status, + f_origin AS flight_origin, + f_destination AS flight_destination, + f_code AS flight_code, + f_scheduled_dep AS flight_scheduled_dep, + f_scheduled_arr AS flight_scheduled_arr FROM cf_stream - PARTITION BY F_ID; + PARTITION BY f_id; -CREATE TABLE customer_flights_rekeyed - (FLIGHT_ID INT PRIMARY KEY) - WITH (KAFKA_TOPIC='cf_rekey', FORMAT='AVRO'); +CREATE TABLE customer_flights_rekeyed ( + flight_id INT PRIMARY KEY, + customer_id VARCHAR, + customer_name VARCHAR, + customer_address VARCHAR, + customer_email VARCHAR, + customer_phone VARCHAR, + customer_loyalty_status VARCHAR, + flight_origin VARCHAR, + flight_destination VARCHAR, + flight_code VARCHAR, + flight_scheduled_dep TIMESTAMP, + flight_scheduled_arr TIMESTAMP +) WITH ( + KAFKA_TOPIC = 'cf_rekey', + KEY_FORMAT = 'KAFKA', + VALUE_FORMAT = 'JSON' +); diff --git a/_includes/tutorials/aviation/confluent/markup/dev/explanation.adoc b/_includes/tutorials/aviation/confluent/markup/dev/explanation.adoc index 22dcca4d88..39130ada02 100644 --- a/_includes/tutorials/aviation/confluent/markup/dev/explanation.adoc +++ b/_includes/tutorials/aviation/confluent/markup/dev/explanation.adoc @@ -2,8 +2,6 @@ ksqlDB supports tables and streams as objects. Both are backed by Kafka topics. Here we're going to create three tables in a normalized data model to hold information about our customers, their bookings, and the flights. -[//]: # "`TODO: Simple ERD of the three tables`" - First off, let's create a table that will hold data about our customers: ++++ @@ -14,11 +12,11 @@ This will store the data in a Kafka topic. In practice, you would probably popul [source,sql] ---- -INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (1, 'Gleda Lealle', '93 Express Point', 'glealle0@senate.gov', '+351 831 301 6746', 'Silver'); -INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (2, 'Gilly Crocombe', '332 Blaine Avenue', 'gcrocombe1@homestead.com', '+33 203 565 3736', 'Silver'); -INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (3, 'Astrix Aspall', '56 Randy Place', 'aaspall2@ebay.co.uk', '+33 679 296 6645', 'Gold'); -INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (4, 'Ker Omond', '23255 Tennessee Court', 'komond3@usnews.com', '+33 515 323 0170', 'Silver'); -INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (5, 'Arline Synnott', '144 Ramsey Avenue', 'asynnott4@theatlantic.com', '+62 953 759 8885', 'Bronze'); +INSERT INTO customers (id, name, address, email, phone, loyalty_status) VALUES (1, 'Gleda Lealle', '93 Express Point', 'glealle0@senate.gov', '+351 831 301 6746', 'Silver'); +INSERT INTO customers (id, name, address, email, phone, loyalty_status) VALUES (2, 'Gilly Crocombe', '332 Blaine Avenue', 'gcrocombe1@homestead.com', '+33 203 565 3736', 'Silver'); +INSERT INTO customers (id, name, address, email, phone, loyalty_status) VALUES (3, 'Astrix Aspall', '56 Randy Place', 'aaspall2@ebay.co.uk', '+33 679 296 6645', 'Gold'); +INSERT INTO customers (id, name, address, email, phone, loyalty_status) VALUES (4, 'Ker Omond', '23255 Tennessee Court', 'komond3@usnews.com', '+33 515 323 0170', 'Silver'); +INSERT INTO customers (id, name, address, email, phone, loyalty_status) VALUES (5, 'Arline Synnott', '144 Ramsey Avenue', 'asynnott4@theatlantic.com', '+62 953 759 8885', 'Bronze'); ---- Next, we'll create a table of flights and associated bookings for our customers. @@ -31,16 +29,16 @@ For these two tables, let's add some data. As before, this would usually come di [source,sql] ---- -INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (1, 'LBA', 'AMS', '642', '2021-11-18T06:04:00', '2021-11-18T06:48:00'); -INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (2, 'LBA', 'LHR', '9607', '2021-11-18T07:36:00', '2021-11-18T08:05:00'); -INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (3, 'AMS', 'TXL', '7968', '2021-11-18T08:11:00', '2021-11-18T10:41:00'); -INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (4, 'AMS', 'OSL', '496', '2021-11-18T11:20:00', '2021-11-18T13:25:00'); -INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (5, 'LHR', 'JFK', '9230', '2021-11-18T10:36:00', '2021-11-18T19:07:00'); - -INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (1,2,1); -INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (2,1,1); -INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (3,5,3); -INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (4,4,2); +INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (1, 'LBA', 'AMS', '642', '2021-11-18T06:04:00', '2021-11-18T06:48:00'); +INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (2, 'LBA', 'LHR', '9607', '2021-11-18T07:36:00', '2021-11-18T08:05:00'); +INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (3, 'AMS', 'TXL', '7968', '2021-11-18T08:11:00', '2021-11-18T10:41:00'); +INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (4, 'AMS', 'OSL', '496', '2021-11-18T11:20:00', '2021-11-18T13:25:00'); +INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (5, 'LHR', 'JFK', '9230', '2021-11-18T10:36:00', '2021-11-18T19:07:00'); + +INSERT INTO bookings (id, customer_id, flight_id) VALUES (1,2,1); +INSERT INTO bookings (id, customer_id, flight_id) VALUES (2,1,1); +INSERT INTO bookings (id, customer_id, flight_id) VALUES (3,5,3); +INSERT INTO bookings (id, customer_id, flight_id) VALUES (4,4,2); ---- ### Denormalize the data @@ -48,21 +46,19 @@ INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (4,4,2); To give us a single view of the passenger/flight data, we'll denormalize down the three tables into one. First, we join the customers to bookings that they've made and build a new table as a result: ++++ -
{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/o01.sql %}
-++++
-
-++++
-{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/j01.sql %}
+
+{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/o01.sql %}
+{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/j01.sql %}
+
++++
From here, we join to details of the flights themselves:
++++
-{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/o01.sql %}
-++++
-
-++++
-{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/j02.sql %}
+
+{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/o01.sql %}
+{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/j02.sql %}
+
++++
At this stage, we can query the data held in the tables to show which customers are booked on which flights:
@@ -73,13 +69,13 @@ At this stage, we can query the data held in the tables to show which customers
[source,sql]
----
-SELECT CB_C_NAME AS NAME
- , CB_C_EMAIL AS EMAIL
- , CB_C_LOYALTY_STATUS AS LOYALTY_STATUS
- , F_ORIGIN AS ORIGIN
- , F_DESTINATION AS DESTINATION
- , F_CODE AS CODE
- , F_SCHEDULED_DEP AS SCHEDULED_DEP
+SELECT cb_c_name AS name,
+ cb_c_email AS email,
+ cb_c_loyalty_status AS loyalty_status,
+ f_origin AS origin,
+ f_destination AS destination,
+ f_code AS code,
+ f_scheduled_dep AS scheduled_dep
FROM customer_flights
EMIT CHANGES;
----
@@ -87,7 +83,7 @@ EMIT CHANGES;
[source,text]
----
+---------------+------------------------+---------------+-------+------------+-----+------------------------+
-|NAME |EMAIL |LOYALTY_STATUS |ORIGIN |DESTINATION |CODE |SCHEDULED_DEP |
+|name |email |loyalty_status |origin |destination |code |scheduled_dep |
+---------------+------------------------+---------------+-------+------------+-----+------------------------+
|Gilly Crocombe |gcrocombe1@homestead.com|Silver |LBA |AMS |642 |2021-11-18T06:04:00.000 |
|Ker Omond |komond3@usnews.com |Silver |LBA |LHR |9607 |2021-11-18T07:36:00.000 |
@@ -95,17 +91,17 @@ EMIT CHANGES;
|Ker Omond |komond3@usnews.com |Silver |AMS |TXL |7968 |2021-11-18T08:11:00.000 |
----
-The last step in denormalizing the data is to set the key of the table to that of the flight ID so that it can be joined to the updates (which we'll get to below).
-
-++++
-{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/o01.sql %}
-++++
+The last step in denormalizing the data is to set the key of the customer flights table to that of the flight ID so that it can be joined to the flight updates (which we'll get to below).
+This is currently a multi-step process, see link:https://github.com/confluentinc/ksql/issues/2356[details].
++++
-{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/r01.sql %}
+
+{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/o01.sql %}
+{% include_raw tutorials/aviation/confluent/code/tutorial-steps/dev/r01.sql %}
+
++++
-We now have the `customer_flights` table but keyed on `FLIGHT_ID`.
+We now have the customer flights table as before, but keyed on `flight_id`.
### Add a stream of flight updates
@@ -129,9 +125,9 @@ In another ksqlDB window, add some data to the flight update stream:
[source,sql]
----
-INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (1, 2, '2021-11-18T09:00:00.000', 'Cabin staff unavailable');
-INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (2, 3, '2021-11-19T14:00:00.000', 'Mechanical checks');
-INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (3, 1, '2021-11-19T08:10:09.000', 'Icy conditions');
+INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (1, 2, '2021-11-18T09:00:00.000', 'Cabin staff unavailable');
+INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (2, 3, '2021-11-19T14:00:00.000', 'Mechanical checks');
+INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (3, 1, '2021-11-19T08:10:09.000', 'Icy conditions');
----
In the original window, you will see the details of which passengers are impacted by which flight changes:
@@ -139,7 +135,7 @@ In the original window, you will see the details of which passengers are impacte
[source,text]
----
+---------------+------------------------+--------------------+----------------------+---------------------------+------------------+-------------------+------------+
-|CUSTOMER_NAME |FLIGHT_CHANGE_REASON |FLIGHT_UPDATED_DEP |FLIGHT_SCHEDULED_DEP |CUSTOMER_EMAIL |CUSTOMER_PHONE |FLIGHT_DESTINATION |FLIGHT_CODE |
+|customer_name |flight_change_reason |flight_updated_dep |flight_scheduled_dep |customer_email |customer_phone |flight_destination |flight_code |
+---------------+------------------------+--------------------+----------------------+---------------------------+------------------+-------------------+------------+
|Gleda Lealle |Icy conditions |2021-11-19T08:10:09 |2021-11-18T06:04:00 |glealle0@senate.gov |+351 831 301 6746 |AMS |642 |
|Ker Omond |Cabin staff unavailable |2021-11-18T09:00:00 |2021-11-18T07:36:00 |komond3@usnews.com |+33 515 323 0170 |LHR |9607 |
diff --git a/_includes/tutorials/customer-journey/confluent/code/tutorial-steps/dev/process.sql b/_includes/tutorials/customer-journey/confluent/code/tutorial-steps/dev/process.sql
index 86e2d02a8c..3fdda3071c 100644
--- a/_includes/tutorials/customer-journey/confluent/code/tutorial-steps/dev/process.sql
+++ b/_includes/tutorials/customer-journey/confluent/code/tutorial-steps/dev/process.sql
@@ -4,8 +4,8 @@ SET 'auto.offset.reset' = 'earliest';
CREATE STREAM pages (
customer INTEGER,
time BIGINT,
- page_id STRING,
- page STRING
+ page_id VARCHAR,
+ page VARCHAR
) WITH (
VALUE_FORMAT = 'JSON',
KAFKA_TOPIC = 'pages',
diff --git a/_includes/tutorials/datacenter/confluent/code/tutorial-steps/dev/source.json b/_includes/tutorials/datacenter/confluent/code/tutorial-steps/dev/source.json
index 4ee37d22c1..db8e9e1c81 100644
--- a/_includes/tutorials/datacenter/confluent/code/tutorial-steps/dev/source.json
+++ b/_includes/tutorials/datacenter/confluent/code/tutorial-steps/dev/source.json
@@ -11,7 +11,7 @@
"database.whitelist" : "customer",
"table.includelist" : "customer.tenant",
"snapshot.mode" : "initial",
- "output.data.format" : "AVRO",
+ "output.data.format" : "JSON",
"tasks.max" : "1"
}
diff --git a/_includes/tutorials/datacenter/confluent/code/tutorial-steps/dev/source.sql b/_includes/tutorials/datacenter/confluent/code/tutorial-steps/dev/source.sql
index 92f830614a..d94343b945 100644
--- a/_includes/tutorials/datacenter/confluent/code/tutorial-steps/dev/source.sql
+++ b/_includes/tutorials/datacenter/confluent/code/tutorial-steps/dev/source.sql
@@ -11,7 +11,7 @@ CREATE SOURCE CONNECTOR customer WITH (
'database.whitelist' = 'customer',
'table.includelist' = 'customer.tenant',
'snapshot.mode' = 'initial',
- 'output.data.format' = 'AVRO',
+ 'output.data.format' = 'JSON',
'tasks.max' = '1'
);
diff --git a/_includes/tutorials/dynamic-pricing/confluent/code/tutorial-steps/dev/process.sql b/_includes/tutorials/dynamic-pricing/confluent/code/tutorial-steps/dev/process.sql
index bbd20ab466..4014b22787 100644
--- a/_includes/tutorials/dynamic-pricing/confluent/code/tutorial-steps/dev/process.sql
+++ b/_includes/tutorials/dynamic-pricing/confluent/code/tutorial-steps/dev/process.sql
@@ -3,7 +3,7 @@ SET 'auto.offset.reset' = 'earliest';
-- Create stream of sales
CREATE STREAM sales (
item_id INT KEY,
- seller_id STRING,
+ seller_id VARCHAR,
price DOUBLE
) WITH (
VALUE_FORMAT = 'JSON',
@@ -14,7 +14,7 @@ CREATE STREAM sales (
-- Create table of items
CREATE TABLE items (
item_id INT PRIMARY KEY,
- item_name STRING
+ item_name VARCHAR
) WITH (
VALUE_FORMAT = 'JSON',
KAFKA_TOPIC = 'items',
diff --git a/_includes/tutorials/internet-of-things/confluent/code/tutorial-steps/dev/process.sql b/_includes/tutorials/internet-of-things/confluent/code/tutorial-steps/dev/process.sql
index 0e708cac29..1eccc3c0b8 100644
--- a/_includes/tutorials/internet-of-things/confluent/code/tutorial-steps/dev/process.sql
+++ b/_includes/tutorials/internet-of-things/confluent/code/tutorial-steps/dev/process.sql
@@ -2,8 +2,8 @@ SET 'auto.offset.reset' = 'earliest';
-- Create table with latest state of alarms
CREATE TABLE alarms (
- device_id STRING PRIMARY KEY,
- alarm_name STRING,
+ device_id VARCHAR PRIMARY KEY,
+ alarm_name VARCHAR,
code INT
) WITH (
VALUE_FORMAT = 'JSON',
@@ -13,7 +13,7 @@ CREATE TABLE alarms (
-- Create stream of throughputs
CREATE STREAM throughputs (
- device_id STRING KEY,
+ device_id VARCHAR KEY,
throughput DOUBLE
) WITH (
VALUE_FORMAT = 'JSON',
@@ -21,7 +21,7 @@ CREATE STREAM throughputs (
PARTITIONS = 6
);
--- Create new stream of critial issues to investigate
+-- Create new stream of critical issues to investigate
-- where throughputs are below threshold 1000.0 and alarm code is not 0
CREATE STREAM critical_issues WITH (KAFKA_TOPIC = 'critical_issues') AS
SELECT
diff --git a/_includes/tutorials/inventory/confluent/code/tutorial-steps/dev/process.sql b/_includes/tutorials/inventory/confluent/code/tutorial-steps/dev/process.sql
index eb2fd0e6a8..366dca6def 100644
--- a/_includes/tutorials/inventory/confluent/code/tutorial-steps/dev/process.sql
+++ b/_includes/tutorials/inventory/confluent/code/tutorial-steps/dev/process.sql
@@ -2,8 +2,8 @@ SET 'auto.offset.reset' = 'earliest';
-- Create stream of inventory
CREATE STREAM inventory_stream (
- id STRING KEY,
- item STRING,
+ id VARCHAR KEY,
+ item VARCHAR,
quantity INTEGER
) WITH (
VALUE_FORMAT = 'JSON',
diff --git a/_includes/tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/manual.sql b/_includes/tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/manual.sql
new file mode 100644
index 0000000000..7b98631306
--- /dev/null
+++ b/_includes/tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/manual.sql
@@ -0,0 +1,14 @@
+-- For the purposes of this recipe when testing by inserting records manually,
+-- a short pause between these insert groups is required. This allows
+-- the merchant location data to be processed by the merchants_by_geohash
+-- table before the user location data is joined in the alerts_raw stream.
+INSERT INTO MERCHANT_LOCATIONS (id, latitude, longitude, description, geohash) VALUES (1, 14.5486606, 121.0477211, '7-Eleven RCBC Center', 'wdw4f88206fx');
+INSERT INTO MERCHANT_LOCATIONS (id, latitude, longitude, description, geohash) VALUES (2, 14.5473328, 121.0516176, 'Jordan Manila', 'wdw4f87075kt');
+INSERT INTO MERCHANT_LOCATIONS (id, latitude, longitude, description, geohash) VALUES (3, 14.5529666, 121.0516716, 'Lawson Eco Tower', 'wdw4f971hmsv');
+
+-- Wait 10 seconds before inserting the records below
+
+INSERT INTO USER_LOCATIONS (id, latitude, longitude, geohash) VALUES (1, 14.5472791, 121.0475401, 'wdw4f820h17g');
+INSERT INTO USER_LOCATIONS (id, latitude, longitude, geohash) VALUES (2, 14.5486952, 121.0521851, 'wdw4f8e82376');
+INSERT INTO USER_LOCATIONS (id, latitude, longitude, geohash) VALUES (2, 14.5517401, 121.0518652, 'wdw4f9560buw');
+INSERT INTO USER_LOCATIONS (id, latitude, longitude, geohash) VALUES (2, 14.5500341, 121.0555802, 'wdw4f8vbp6yv');
diff --git a/_includes/tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/process.sql b/_includes/tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/process.sql
new file mode 100644
index 0000000000..a7b866b9f0
--- /dev/null
+++ b/_includes/tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/process.sql
@@ -0,0 +1,85 @@
+SET 'auto.offset.reset' = 'earliest';
+
+-- Creates a table of merchant data including the calculated geohash
+CREATE TABLE merchant_locations (
+ id INT PRIMARY KEY,
+ description VARCHAR,
+ latitude DECIMAL(10,7),
+ longitude DECIMAL(10,7),
+ geohash VARCHAR
+) WITH (
+ KAFKA_TOPIC='merchant-locations',
+ VALUE_FORMAT='JSON',
+ PARTITIONS = 6
+);
+
+-- Creates a table to lookup merchants based on a
+-- substring (precision) of the geohash
+CREATE TABLE merchants_by_geohash
+WITH (
+ KAFKA_TOPIC='merchant-geohash',
+ FORMAT='JSON',
+ PARTITIONS=6
+) AS
+SELECT
+ SUBSTRING(geohash, 1, 6) AS geohash,
+ COLLECT_LIST(id) as id_list
+FROM merchant_locations
+GROUP BY SUBSTRING(geohash, 1, 6);
+
+-- Creates a stream of user location data including the calculated geohash
+CREATE STREAM user_locations (
+ id INT,
+ latitude DECIMAL(10,7),
+ longitude DECIMAL(10,7),
+ geohash VARCHAR
+) WITH (
+ KAFKA_TOPIC='user-locations',
+ VALUE_FORMAT='JSON',
+ PARTITIONS=6
+);
+
+-- Creates a stream of alerts when a user's geohash based location roughly
+-- intersects a collection of merchants locations from the
+-- merchants_by_geohash table.
+CREATE STREAM alerts_raw
+WITH (
+ KAFKA_TOPIC='alerts-raw',
+ VALUE_FORMAT='JSON',
+ PARTITIONS=6
+) AS
+SELECT
+ user_locations.id as user_id,
+ user_locations.latitude AS user_latitude,
+ user_locations.longitude AS user_longitude,
+ SUBSTRING(user_locations.geohash, 1, 6) AS user_geohash,
+ EXPLODE(merchants_by_geohash.id_list) AS merchant_id
+FROM user_locations
+LEFT JOIN merchants_by_geohash ON SUBSTRING(user_locations.geohash, 1, 6) =
+ merchants_by_geohash.geohash
+PARTITION BY null;
+
+-- Creates a stream of promotion alerts to send a user when their location
+-- intersects with a merchant within a specified distance (0.2 KM)
+CREATE STREAM promo_alerts
+WITH (
+ KAFKA_TOPIC='promo-alerts',
+ VALUE_FORMAT='JSON',
+ PARTITIONS=6
+) AS
+SELECT
+ alerts_raw.user_id,
+ alerts_raw.user_geohash,
+ merchant_locations.description AS merchant_description,
+ CAST(
+ GEO_DISTANCE(alerts_raw.user_latitude, alerts_raw.user_longitude,
+ merchant_locations.latitude, merchant_locations.longitude,
+ 'KM') * 1000 AS INT) as distance_meters,
+ STRUCT(lat := CAST(alerts_raw.user_latitude AS DOUBLE), lon := CAST(alerts_raw.user_longitude AS DOUBLE)) AS geopoint
+FROM alerts_raw
+LEFT JOIN merchant_locations on alerts_raw.merchant_id = merchant_locations.id
+WHERE GEO_DISTANCE(
+ alerts_raw.user_latitude, alerts_raw.user_longitude,
+ merchant_locations.latitude, merchant_locations.longitude, 'KM') < 0.2
+PARTITION BY null
+EMIT CHANGES;
diff --git a/_includes/tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/sink.json b/_includes/tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/sink.json
new file mode 100644
index 0000000000..43937f2060
--- /dev/null
+++ b/_includes/tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/sink.json
@@ -0,0 +1,15 @@
+{
+ "connector.class" : "ElasticsearchSink",
+ "name" : "promo-alerts-sink",
+ "input.data.format" : "JSON",
+ "kafka.api.key" : "{% include_raw tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/manual.sql %}
+++++
diff --git a/_includes/tutorials/location-based-alerting/confluent/markup/dev/process.adoc b/_includes/tutorials/location-based-alerting/confluent/markup/dev/process.adoc
new file mode 100644
index 0000000000..9ef084757e
--- /dev/null
+++ b/_includes/tutorials/location-based-alerting/confluent/markup/dev/process.adoc
@@ -0,0 +1,3 @@
+++++
+{% include_raw tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/process.sql %}
+++++
diff --git a/_includes/tutorials/location-based-alerting/confluent/markup/dev/sink.adoc b/_includes/tutorials/location-based-alerting/confluent/markup/dev/sink.adoc
new file mode 100644
index 0000000000..3f02f5578c
--- /dev/null
+++ b/_includes/tutorials/location-based-alerting/confluent/markup/dev/sink.adoc
@@ -0,0 +1,5 @@
+Sinking the promotion alerts out to Elasticsearch facilitates further search processing:
+
+++++
+{% include_raw tutorials/clickstream/confluent/code/tutorial-steps/dev/sink.json %}
+++++
diff --git a/_includes/tutorials/location-based-alerting/confluent/markup/dev/source.adoc b/_includes/tutorials/location-based-alerting/confluent/markup/dev/source.adoc
new file mode 100644
index 0000000000..94868c8831
--- /dev/null
+++ b/_includes/tutorials/location-based-alerting/confluent/markup/dev/source.adoc
@@ -0,0 +1,5 @@
+This tutorial assumes that you have merchant data stored in an SQL database. The merchant data includes geolocation information, which will be matched with the stream of location data from a user's device. First, deploy a source connector that will read the merchant data into a Kafka topic for stream processing in ksqlDB.
+
+++++
+{% include_raw tutorials/location-based-alerting/confluent/code/tutorial-steps/dev/source.json %}
+++++
diff --git a/_includes/tutorials/loyalty-rewards/confluent/code/tutorial-steps/dev/process.sql b/_includes/tutorials/loyalty-rewards/confluent/code/tutorial-steps/dev/process.sql
index e37ef2278f..5a485ee984 100644
--- a/_includes/tutorials/loyalty-rewards/confluent/code/tutorial-steps/dev/process.sql
+++ b/_includes/tutorials/loyalty-rewards/confluent/code/tutorial-steps/dev/process.sql
@@ -5,7 +5,7 @@ CREATE STREAM users (
name VARCHAR
) WITH (
KAFKA_TOPIC = 'USERS',
- VALUE_FORMAT = 'AVRO',
+ VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
@@ -15,7 +15,7 @@ CREATE STREAM products (
price DECIMAL(10,2)
) WITH (
KAFKA_TOPIC = 'products',
- VALUE_FORMAT = 'AVRO',
+ VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
@@ -24,7 +24,7 @@ CREATE STREAM purchases (
product_id VARCHAR
) WITH (
KAFKA_TOPIC = 'purchases',
- VALUE_FORMAT = 'AVRO',
+ VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
diff --git a/_includes/tutorials/mainframe-offload/confluent/code/tutorial-steps/dev/process.sql b/_includes/tutorials/mainframe-offload/confluent/code/tutorial-steps/dev/process.sql
index b85d8c37a5..640e267c6f 100644
--- a/_includes/tutorials/mainframe-offload/confluent/code/tutorial-steps/dev/process.sql
+++ b/_includes/tutorials/mainframe-offload/confluent/code/tutorial-steps/dev/process.sql
@@ -2,7 +2,7 @@ SET 'auto.offset.reset' = 'earliest';
-- Create stream of transactions from the Kafka topic
CREATE STREAM mq_transactions (
- dep_account_no STRING,
+ dep_account_no VARCHAR,
dep_balance_dollars BIGINT,
dep_balance_cents BIGINT,
timestamp BIGINT
@@ -25,7 +25,7 @@ PARTITION BY dep_account_no
EMIT CHANGES;
CREATE SOURCE TABLE mq_cache (
- dep_account_no STRING PRIMARY KEY,
+ dep_account_no VARCHAR PRIMARY KEY,
balance BIGINT,
ts_stream BIGINT,
ts_cache BIGINT,
diff --git a/_includes/tutorials/model-retraining/confluent/code/tutorial-steps/dev/manual.sql b/_includes/tutorials/model-retraining/confluent/code/tutorial-steps/dev/manual.sql
new file mode 100644
index 0000000000..7d4058b5a5
--- /dev/null
+++ b/_includes/tutorials/model-retraining/confluent/code/tutorial-steps/dev/manual.sql
@@ -0,0 +1,21 @@
+INSERT INTO predicted_weight VALUES ('101', 'Salmon', 17.33, 74.55, 3.78);
+INSERT INTO predicted_weight VALUES ('102', 'Salmon', 19.11, 82.19, 4.17);
+INSERT INTO predicted_weight VALUES ('103', 'Salmon', 21.07, 90.62, 4.6);
+INSERT INTO predicted_weight VALUES ('104', 'Bass', 15.44, 56.23, 2.54);
+INSERT INTO predicted_weight VALUES ('105', 'Bass', 17.02, 62, 2.8);
+INSERT INTO predicted_weight VALUES ('106', 'Bass', 18.76, 68.34, 3.09);
+INSERT INTO predicted_weight VALUES ('107', 'Trout', 13.34, 64.05, 1.47);
+INSERT INTO predicted_weight VALUES ('108', 'Trout', 14.71, 70.61, 1.62);
+INSERT INTO predicted_weight VALUES ('109', 'Trout', 16.22, 77.85, 1.79);
+INSERT INTO predicted_weight VALUES ('110', 'Trout', 17.03, 81.74, 1.88);
+
+INSERT INTO actual_weight VALUES ('101', 'Salmon', 4.38);
+INSERT INTO actual_weight VALUES ('102', 'Salmon', 3.17);
+INSERT INTO actual_weight VALUES ('103', 'Salmon', 5.6);
+INSERT INTO actual_weight VALUES ('104', 'Bass', 5.54);
+INSERT INTO actual_weight VALUES ('105', 'Bass', 1.8);
+INSERT INTO actual_weight VALUES ('106', 'Bass', 4.09);
+INSERT INTO actual_weight VALUES ('107', 'Trout', 2.47);
+INSERT INTO actual_weight VALUES ('108', 'Trout', 2.62);
+INSERT INTO actual_weight VALUES ('109', 'Trout', 2.79);
+INSERT INTO actual_weight VALUES ('110', 'Trout', 2.88);
\ No newline at end of file
diff --git a/_includes/tutorials/model-retraining/confluent/code/tutorial-steps/dev/process.sql b/_includes/tutorials/model-retraining/confluent/code/tutorial-steps/dev/process.sql
new file mode 100644
index 0000000000..c9d27b3f92
--- /dev/null
+++ b/_includes/tutorials/model-retraining/confluent/code/tutorial-steps/dev/process.sql
@@ -0,0 +1,56 @@
+SET 'auto.offset.reset' = 'earliest';
+
+-- Create stream of predictions
+CREATE STREAM predicted_weight(
+ fish_id VARCHAR KEY,
+ species VARCHAR,
+ height DOUBLE,
+ length DOUBLE,
+ prediction DOUBLE
+) WITH (
+ KAFKA_TOPIC = 'kt.mdb.weight-prediction',
+ VALUE_FORMAT = 'JSON',
+ PARTITIONS = 6
+);
+
+-- Create stream of actual weights
+CREATE STREAM actual_weight(
+ fish_id VARCHAR KEY,
+ species VARCHAR,
+ weight DOUBLE
+) WITH (
+ KAFKA_TOPIC = 'kt.mdb.machine-weight',
+ VALUE_FORMAT = 'JSON',
+ PARTITIONS = 6
+);
+
+-- Create stream joining predictions with actual weights
+CREATE STREAM diff_weight WITH (KAFKA_TOPIC = 'diff_weight') AS
+ SELECT
+ -- This fake key field will give us something to group by in the next step
+ 'key' AS key,
+ predicted_weight.fish_id AS fish_id,
+ predicted_weight.species AS species,
+ predicted_weight.length AS length,
+ predicted_weight.height AS height,
+ predicted_weight.prediction AS prediction,
+ actual_weight.weight AS actual,
+ ROUND(ABS(predicted_weight.prediction - actual_weight.weight) / actual_weight.weight, 3) AS Error
+FROM predicted_weight
+INNER JOIN actual_weight
+WITHIN 1 MINUTE
+GRACE PERIOD 1 MINUTE
+ON predicted_weight.fish_id = actual_weight.fish_id;
+
+-- Create table of one minute aggregates with over 15% error rate
+CREATE TABLE retrain_weight WITH (KAFKA_TOPIC = 'retrain_weight') AS
+ SELECT
+ key,
+ COLLECT_SET(species) AS species,
+ EARLIEST_BY_OFFSET(fish_id) AS fish_id_start,
+ LATEST_BY_OFFSET(fish_id) AS fish_id_end,
+ AVG(Error) AS ErrorAvg
+FROM diff_weight
+WINDOW TUMBLING (SIZE 1 MINUTE, GRACE PERIOD 1 MINUTE)
+GROUP BY key
+HAVING ROUND(AVG(diff_weight.Error), 2) > 0.15;
diff --git a/_includes/tutorials/model-retraining/confluent/code/tutorial-steps/dev/sink.json b/_includes/tutorials/model-retraining/confluent/code/tutorial-steps/dev/sink.json
new file mode 100644
index 0000000000..9573993db6
--- /dev/null
+++ b/_includes/tutorials/model-retraining/confluent/code/tutorial-steps/dev/sink.json
@@ -0,0 +1,31 @@
+{
+ "connector.class" : "MongoDbAtlasSink",
+ "name" : "weight-data",
+ "kafka.auth.mode" : "KAFKA_API_KEY",
+ "kafka.api.key" : "{% include_raw tutorials/model-retraining/confluent/code/tutorial-steps/dev/manual.sql %}
+++++
diff --git a/_includes/tutorials/model-retraining/confluent/markup/dev/process.adoc b/_includes/tutorials/model-retraining/confluent/markup/dev/process.adoc
new file mode 100644
index 0000000000..1715dc9aa5
--- /dev/null
+++ b/_includes/tutorials/model-retraining/confluent/markup/dev/process.adoc
@@ -0,0 +1,3 @@
+++++
+{% include_raw tutorials/model-retraining/confluent/code/tutorial-steps/dev/process.sql %}
+++++
diff --git a/_includes/tutorials/model-retraining/confluent/markup/dev/sink.adoc b/_includes/tutorials/model-retraining/confluent/markup/dev/sink.adoc
new file mode 100644
index 0000000000..d930f49a58
--- /dev/null
+++ b/_includes/tutorials/model-retraining/confluent/markup/dev/sink.adoc
@@ -0,0 +1,5 @@
+Now we'll use a MongoDB sink connector to send the combined predictions and actual weights to a database, and the HTTP sink connector to trigger the retraining process.
+
+++++
+{% include_raw tutorials/model-retraining/confluent/code/tutorial-steps/dev/sink.json %}
+++++
diff --git a/_includes/tutorials/model-retraining/confluent/markup/dev/source.adoc b/_includes/tutorials/model-retraining/confluent/markup/dev/source.adoc
new file mode 100644
index 0000000000..90c1d33b97
--- /dev/null
+++ b/_includes/tutorials/model-retraining/confluent/markup/dev/source.adoc
@@ -0,0 +1,5 @@
+The existing pipeline, which is predicting the weight of fish based on size and species, stores its results in two MongoDB collections, which are used by other processes downstream. One collection contains the data fed to the model, along with the prediction. The other contains the actual weight as determined by a later step in the process. For this recipe, we'll use Connect to make this data available to our ksqlDB application.
+
+++++
+{% include_raw tutorials/model-retraining/confluent/code/tutorial-steps/dev/source.json %}
+++++
diff --git a/_includes/tutorials/next-best-offer/confluent/code/tutorial-steps/dev/source.json b/_includes/tutorials/next-best-offer/confluent/code/tutorial-steps/dev/source.json
index 78c4cce56b..7f00ff01ee 100644
--- a/_includes/tutorials/next-best-offer/confluent/code/tutorial-steps/dev/source.json
+++ b/_includes/tutorials/next-best-offer/confluent/code/tutorial-steps/dev/source.json
@@ -8,9 +8,9 @@
"connection.user" : "postgres",
"connection.password" : "{{ site.data.tutorials[page.static_data].introduction }}
+