diff --git a/solutions/iot/weather-stations/.gitignore b/solutions/iot/weather-stations/.gitignore new file mode 100644 index 0000000..ca9a98c --- /dev/null +++ b/solutions/iot/weather-stations/.gitignore @@ -0,0 +1,70 @@ +HELP.md +.gradle +build/ +**/bin/ +!gradle/wrapper/gradle-wrapper.jar +!**/src/main/**/build/ +!**/src/test/**/build/ +**/.DS_Store + +**/.metals +**/.bloop +**/.bsp +spark/project +spark/target + +observability/prometheus/manifests +observability/prometheus/vendor +observability/prometheus/jsonnetfile.* + +**/.ipynb_checkpoints +**/.python-version + +### Secrets ### +k8s/secrets/aiven/* +**/pgpassfile +**/*.p12 +**/*.jks +**/*.pem +**/token +**/secrets.tfvars + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr +out/ +!**/src/main/**/out/ +!**/src/test/**/out/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ + +### VS Code ### +.vscode/ + +### Terraform ### +# Local .terraform directories +**/.terraform/* + +# .tfstate files +**/*.tfstate +**/*.tfstate.* +**/.terraform.lock.hcl + +# Crash log files +crash.log diff --git a/solutions/iot/weather-stations/.gitmodules b/solutions/iot/weather-stations/.gitmodules new file mode 100644 index 0000000..7f086e2 --- /dev/null +++ b/solutions/iot/weather-stations/.gitmodules @@ -0,0 +1,4 @@ +[submodule "observability/highlander"] + path = observability/highlander + url = https://github.com/ssaily/highlander.git + branch = basic-auth-support diff --git a/solutions/iot/weather-stations/LICENSE b/solutions/iot/weather-stations/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/solutions/iot/weather-stations/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/solutions/iot/weather-stations/README.md b/solutions/iot/weather-stations/README.md new file mode 100644 index 0000000..2c2b5ea --- /dev/null +++ b/solutions/iot/weather-stations/README.md @@ -0,0 +1,53 @@ +# Data pipeline using Kafka and M3DB + +Prerequisite +- Active Aiven account and project (https://aiven.io/) +- Aiven CLI (https://github.com/aiven/aiven-client) +- Terraform (https://learn.hashicorp.com/tutorials/terraform/install-cli) +- k8s cluster and kubectl command line tool +- jq command line tool (https://stedolan.github.io/jq/download/) +- kafkacat tool (https://github.com/edenhill/kafkacat) + +## Infrastructure +``` +cd infra +terraform apply +```` + +## Download secrets +```` +./get-demo-secrets.sh +```` + +## Import weather station metadata to PostgreSQL db +``` +cd database +./import-stations.sh +``` + +## Create k8s resources +```` +cd k8s +```` + +### Namespace +``` +kubectl create -f namespace.yaml +``` + +### Secrets +``` +./create-k8s-secrets.sh +``` + +### Deploy observability (Optional) +Follow instructions [here](observability/README.md) + +### Deployments +``` +kubectl create -f deploy-ingest.yaml +kubectl create -f deploy-processing.yaml +kubectl create -f deploy-sink.yaml +kubectl create -f ksqldb.yaml +``` + diff --git a/solutions/iot/weather-stations/database/README.md b/solutions/iot/weather-stations/database/README.md new file mode 100644 index 0000000..d62af4a --- /dev/null +++ b/solutions/iot/weather-stations/database/README.md @@ -0,0 +1,3 @@ +# PostgreSQL database for weather station metadata + +Run ```import-stations.sh``` script to download metadata from Digitraffic. Script will create PostgreSQL tables and then load data from downloaded JSON files. Uses jq for transformations. diff --git a/solutions/iot/weather-stations/database/create.sql b/solutions/iot/weather-stations/database/create.sql new file mode 100644 index 0000000..b3015df --- /dev/null +++ b/solutions/iot/weather-stations/database/create.sql @@ -0,0 +1,3 @@ +CREATE TABLE IF NOT EXISTS weather_stations (roadstationid smallint primary key, name varchar(128), municipality varchar(64), province varchar(64), latitude float, longitude float); +CREATE TABLE IF NOT EXISTS weather_sensors (sensorid smallint primary key, name varchar(128), unit varchar(8), accuracy smallint); + diff --git a/solutions/iot/weather-stations/database/import-stations.sh b/solutions/iot/weather-stations/database/import-stations.sh new file mode 100755 index 0000000..529fe18 --- /dev/null +++ b/solutions/iot/weather-stations/database/import-stations.sh @@ -0,0 +1,7 @@ +psql "$(< pgpassfile)" -f create.sql + +curl -X GET "https://tie.digitraffic.fi/api/v3/metadata/weather-stations?lastUpdated=false" -H "accept: application/geo+json" -H "Accept-Encoding: gzip, deflate"|jq -r '.features|map([(.properties.roadStationId|tostring), .properties.name, .properties.municipality, .properties.province,(.geometry.coordinates[1]|tostring), (.geometry.coordinates[0]|tostring)]|join(","))|join("\n")' > weather_stations.csv +psql "$(< pgpassfile)" -c "\copy weather_stations (roadstationid, name, municipality, province, latitude, longitude) from 'weather_stations.csv' CSV DELIMITER ',';" + +curl -X GET "https://tie.digitraffic.fi/api/v3/metadata/weather-sensors?lastUpdated=false" -H "accept: application/json" -H "Accept-Encoding: gzip, deflate"|gzip -dc|jq -r '.roadStationSensors|map([(.id|tostring), .name, .unit, (.accuracy|tostring)]|join(","))|join("\n")' > sensors.csv +psql "$(< pgpassfile)" -c "\copy weather_sensors (sensorid, name, unit, accuracy) from 'sensors.csv' CSV DELIMITER ',';" diff --git a/solutions/iot/weather-stations/get-demo-secrets.sh b/solutions/iot/weather-stations/get-demo-secrets.sh new file mode 100755 index 0000000..68a17cf --- /dev/null +++ b/solutions/iot/weather-stations/get-demo-secrets.sh @@ -0,0 +1,48 @@ +#!/bin/sh +# Download Kafka service user certificates +[ ! -d "k8s/secrets/aiven/ingest" ] && avn service user-kafka-java-creds --project $1 --username tms-ingest-user -p supersecret -d k8s/secrets/aiven/ingest tms-demo-kafka +[ ! -d "k8s/secrets/aiven/processing" ] && avn service user-kafka-java-creds --project $1 --username tms-processing-user -p supersecret -d k8s/secrets/aiven/processing tms-demo-kafka +[ ! -d "k8s/secrets/aiven/sink" ] && avn service user-kafka-java-creds --project $1 --username tms-sink-user -p supersecret -d k8s/secrets/aiven/sink tms-demo-kafka +[ ! -d "k8s/secrets/aiven/admin" ] && avn service user-kafka-java-creds --project $1 --username avnadmin -p supersecret -d k8s/secrets/aiven/admin tms-demo-kafka + +# Generate pgpassfile for bootstrapping PostgreSQL tables +avn service get tms-demo-pg --json -v --project $1|jq -r '("host=" + .service_uri_params.host + " port=" + .service_uri_params.port + " dbname=" + .service_uri_params.dbname + " user=" + .service_uri_params.user + " password=" + .service_uri_params.password)' > database/pgpassfile + +# Extract endpoints and secrets from Aiven services +KAFKA_JSON=$(avn service get tms-demo-kafka --project $1 --json -v) +M3_OBS_JSON=$(avn service get tms-demo-obs-m3db --project $1 --json -v) +M3_IOT_JSON=$(avn service get tms-demo-iot-m3db --project $1 --json -v) +OS_JSON=$(avn service get tms-demo-os --project $1 --json -v) + +M3_PROM_URI=$(jq -r '.components[] | select(.component == "m3coordinator_prom_remote_write") |"https://\(.host):\(.port)\(.path)"' <<< $M3_OBS_JSON) +M3_PROM_USER=$(jq -r '.users[] | select(.type == "primary") |"\(.username)"' <<< $M3_OBS_JSON) +M3_PROM_PWD=$(jq -r '.users[] | select(.type == "primary") |"\(.password)"' <<< $M3_OBS_JSON) +M3_INFLUXDB_URI=$(jq -r '"https://" + (.service_uri_params.host + ":" + .service_uri_params.port + "/api/v1/influxdb/write")' <<< $M3_IOT_JSON) +M3_CREDENTIALS=$(jq -r '.users[] | select(.type == "primary") |"\(.username):\(.password)"' <<< $M3_IOT_JSON) + +SCHEMA_REGISTRY_HOST=$(jq -r '.components[] | select(.component == "schema_registry") |"\(.host):\(.port)"' <<< $KAFKA_JSON) +SCHEMA_REGISTRY_URI=$(jq -r .connection_info.schema_registry_uri <<< $KAFKA_JSON) +KAFKA_SERVICE_URI=$(jq -r .service_uri <<< $KAFKA_JSON) + +OS_HOST=$(jq -r '(.service_uri_params.host)' <<< $OS_JSON) +OS_PORT=$(jq -r '(.service_uri_params.port)' <<< $OS_JSON) +OS_USER=$(jq -r '(.service_uri_params.user)' <<< $OS_JSON) +OS_PASSWORD=$(jq -r '(.service_uri_params.password)' <<< $OS_JSON) + + +echo $SCHEMA_REGISTRY_URI > k8s/secrets/aiven/schema_registry_uri +echo $M3_INFLUXDB_URI > k8s/secrets/aiven/m3_influxdb_uri +echo $M3_CREDENTIALS > k8s/secrets/aiven/m3_credentials +echo $M3_PROM_USER > k8s/secrets/aiven/m3_prom_user +echo $M3_PROM_PWD > k8s/secrets/aiven/m3_prom_pwd +echo $M3_PROM_URI > k8s/secrets/aiven/m3_prom_uri +echo $KAFKA_SERVICE_URI > k8s/secrets/aiven/kafka_service_uri +echo $OS_HOST > k8s/secrets/aiven/os_host +echo $OS_PORT > k8s/secrets/aiven/os_port +echo $OS_USER > k8s/secrets/aiven/os_user +echo $OS_PASSWORD > k8s/secrets/aiven/os_password + +# Generate truststore for Schema Registry CA +openssl s_client -connect $SCHEMA_REGISTRY_HOST -showcerts < /dev/null 2>/dev/null | awk '/BEGIN CERT/{s=1}; s{t=t "\n" $0}; /END CERT/ {last=t; t=""; s=0}; END{print last}' > k8s/secrets/aiven/sr-ca.cert +keytool -import -file k8s/secrets/aiven/sr-ca.cert -alias CA -keystore k8s/secrets/aiven/schema_registry.truststore.jks -storepass supersecret -noprompt + diff --git a/solutions/iot/weather-stations/infra/README.md b/solutions/iot/weather-stations/infra/README.md new file mode 100644 index 0000000..21fdf4f --- /dev/null +++ b/solutions/iot/weather-stations/infra/README.md @@ -0,0 +1,15 @@ +# Terraform plan for Aiven resources + +This plan will create a bunch of services within your Aiven project +- Kafka +- Kafka Connect Cluster +- PostgreSQL source connector for Kafka Connect +- PostgreSQL database +- Topics, Users and ACLs for Kafka +- M3 Time Series database + +You should copy the ```secrets.tfvars.template``` file to ```secrets.tfvars``` and fill in your Aiven project name, token and cloud region. After that you can run + +``` +terraform apply -var-file=secrets.tfvars +``` diff --git a/solutions/iot/weather-stations/infra/connector.tf b/solutions/iot/weather-stations/infra/connector.tf new file mode 100644 index 0000000..48a4408 --- /dev/null +++ b/solutions/iot/weather-stations/infra/connector.tf @@ -0,0 +1,162 @@ +data "aiven_service_component" "schema_registry" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + component = "schema_registry" + route = "dynamic" +} + +data "aiven_service_component" "tms_pg" { + project = var.avn_project_id + service_name = aiven_pg.tms-demo-pg.service_name + component = "pg" + route = "dynamic" +} + +locals { + schema_registry_uri = "https://${data.aiven_service_user.kafka_admin.username}:${data.aiven_service_user.kafka_admin.password}@${data.aiven_service_component.schema_registry.host}:${data.aiven_service_component.schema_registry.port}" +} + +resource "aiven_kafka_connector" "kafka-pg-cdc-stations" { + project = var.avn_project_id + service_name = aiven_kafka_connect.tms-demo-kafka-connect1.service_name + connector_name = "kafka-pg-cdc-stations" + + config = { + "_aiven.restart.on.failure": "true", + "key.converter" : "org.apache.kafka.connect.storage.StringConverter", + "key.converter.schemas.enable": "false", + "value.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url": local.schema_registry_uri, + "value.converter.basic.auth.credentials.source": "URL", + "value.converter.schemas.enable": "true", + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "name": "kafka-pg-cdc-stations", + "slot.name": "weatherstations", + "database.hostname": data.aiven_service_component.tms_pg.host, + "database.port": data.aiven_service_component.tms_pg.port, + "database.user": data.aiven_service_user.pg_admin.username, + "database.password": data.aiven_service_user.pg_admin.password, + "database.dbname": "defaultdb", + "database.server.name": "tms-demo-pg", + "table.whitelist": "public.weather_stations", + "plugin.name": "wal2json", + "database.sslmode": "require", + "transforms": "unwrap,insertKey,extractKey", + "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope", + "transforms.unwrap.drop.tombstones":"false", + "transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey", + "transforms.insertKey.fields":"roadstationid", + "transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key", + "transforms.extractKey.field":"roadstationid", + "include.schema.changes": "false" + } +} + +resource "aiven_kafka_connector" "kafka-pg-cdc-stations-2" { + project = var.avn_project_id + service_name = aiven_kafka_connect.tms-demo-kafka-connect1.service_name + connector_name = "kafka-pg-cdc-stations-2" + + config = { + "_aiven.restart.on.failure": "true", + "key.converter" : "io.confluent.connect.avro.AvroConverter", + "key.converter.schema.registry.url": local.schema_registry_uri, + "key.converter.basic.auth.credentials.source": "URL", + "key.converter.schemas.enable": "true", + "value.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url": local.schema_registry_uri, + "value.converter.basic.auth.credentials.source": "URL", + "value.converter.schemas.enable": "true", + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "name": "kafka-pg-cdc-stations-2", + "slot.name": "weatherstations2", + "database.hostname": data.aiven_service_component.tms_pg.host, + "database.port": data.aiven_service_component.tms_pg.port, + "database.user": data.aiven_service_user.pg_admin.username, + "database.password": data.aiven_service_user.pg_admin.password, + "database.dbname": "defaultdb", + "database.server.name": "tms-demo-pg", + "table.whitelist": "public.weather_stations_2", + "plugin.name": "wal2json", + "database.sslmode": "require", + "transforms":"route", + "transforms.route.regex": "tms-demo-pg.public.weather_stations_2", + "transforms.route.replacement": "weather_stations", + "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", + "include.schema.changes": "false" + } +} + +resource "aiven_kafka_connector" "bq-sink" { + count = "${var.bq_project != "" ? 1 : 0}" + project = var.avn_project_id + service_name = aiven_kafka_connect.tms-demo-kafka-connect1.service_name + connector_name = "bq-sink" + config = { + "_aiven.restart.on.failure": "true", + "name": "bq-sink", + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "key.converter" : "io.confluent.connect.avro.AvroConverter", + "key.converter.schema.registry.url": local.schema_registry_uri, + "key.converter.basic.auth.credentials.source": "URL", + "key.converter.schemas.enable": "true", + "value.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url": local.schema_registry_uri, + "value.converter.basic.auth.credentials.source": "URL", + "value.converter.schemas.enable": "true", + "topics": "weather_stations", + "project": var.bq_project, + "keySource": "JSON", + "keyfile": var.bq_key, + "defaultDataset": "weather_stations", + "kafkaKeyFieldName": "kafkakey", + "kafkaDataFieldName": "kafkavalue", + "allowNewBigQueryFields": "true", + "allowBigQueryRequiredFieldRelaxation": "true", + "allowSchemaUnionization": "true", + "deleteEnabled": "true", + "mergeIntervalMs": "5000", + "transforms": "unwrap,ReplaceField", + "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", + "transforms.unwrap.drop.tombstones": "false", + "transforms.unwrap.delete.handling.mode": "none", + "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", + "transforms.ReplaceField.blacklist": "roadstationid" + } +} + +resource "aiven_kafka_connector" "kafka-pg-cdc-sensors" { + project = var.avn_project_id + service_name = aiven_kafka_connect.tms-demo-kafka-connect1.service_name + connector_name = "kafka-pg-cdc-sensors" + + config = { + "_aiven.restart.on.failure": "true", + "key.converter" : "org.apache.kafka.connect.storage.StringConverter", + "key.converter.schemas.enable": "false", + "value.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url": local.schema_registry_uri, + "value.converter.basic.auth.credentials.source": "URL", + "value.converter.schemas.enable": "true", + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "name": "kafka-pg-cdc-sensors", + "slot.name": "weathersensors", + "database.hostname": data.aiven_service_component.tms_pg.host, + "database.port": data.aiven_service_component.tms_pg.port, + "database.user": data.aiven_service_user.pg_admin.username, + "database.password": data.aiven_service_user.pg_admin.password, + "database.dbname": "defaultdb", + "database.server.name": "tms-demo-pg", + "table.whitelist": "public.weather_sensors", + "plugin.name": "wal2json", + "database.sslmode": "require", + "transforms": "unwrap,insertKey,extractKey", + "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope", + "transforms.unwrap.drop.tombstones":"false", + "transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey", + "transforms.insertKey.fields":"sensorid", + "transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key", + "transforms.extractKey.field":"sensorid", + "include.schema.changes": "false" + } +} \ No newline at end of file diff --git a/solutions/iot/weather-stations/infra/kafka_service_users.tf b/solutions/iot/weather-stations/infra/kafka_service_users.tf new file mode 100644 index 0000000..7d44249 --- /dev/null +++ b/solutions/iot/weather-stations/infra/kafka_service_users.tf @@ -0,0 +1,127 @@ +data "aiven_service_user" "kafka_admin" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + + # default admin user that is automatically created each Aiven service + username = "avnadmin" + + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +resource "aiven_service_user" "tms-ingest-user" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + + username = "tms-ingest-user" + + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +resource "aiven_service_user" "tms-processing-user" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + + username = "tms-processing-user" + + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +resource "aiven_service_user" "tms-sink-user" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + + username = "tms-sink-user" + + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +resource "aiven_kafka_acl" "tms-ingest-acl" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + permission = "write" + username = aiven_service_user.tms-ingest-user.username + topic = aiven_kafka_topic.observations-weather-raw.topic_name + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +# read-write access for Kafka Streams topologies +resource "aiven_kafka_acl" "tms-processing-acl" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + permission = "readwrite" + username = aiven_service_user.tms-processing-user.username + topic = "observations.*" + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +# read access to PostgeSQL CDC topics +resource "aiven_kafka_acl" "tms-processing-read-acl" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + permission = "read" + username = aiven_service_user.tms-processing-user.username + topic = "tms-demo-pg.public.*" + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +# adming access for intermediate Kafka Streams topics (changelog) +resource "aiven_kafka_acl" "tms-processing-admin-acl" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + permission = "admin" + username = aiven_service_user.tms-processing-user.username + topic = "tms-streams-demo-*" + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +# adming access for intermediate Kafka Streams topics (changelog) +resource "aiven_kafka_acl" "tms-processing-admin-acl-2" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + permission = "admin" + username = aiven_service_user.tms-processing-user.username + topic = "tms-microservice-demo-*" + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +# adming access for KSQLDB topics +resource "aiven_kafka_acl" "tms-processing-ksql-acl" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + permission = "admin" + username = aiven_service_user.tms-processing-user.username + topic = "_confluent-ksql-tms_demo_ksqldb_*" + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +#read access for M3 sink service +resource "aiven_kafka_acl" "tms-sink-acl" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + permission = "read" + username = aiven_service_user.tms-sink-user.username + topic = "observations.weather.*" + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} diff --git a/solutions/iot/weather-stations/infra/kafka_services.tf b/solutions/iot/weather-stations/infra/kafka_services.tf new file mode 100644 index 0000000..b9bb4f2 --- /dev/null +++ b/solutions/iot/weather-stations/infra/kafka_services.tf @@ -0,0 +1,71 @@ +# Kafka service +resource "aiven_kafka" "tms-demo-kafka" { + project = var.avn_project_id + cloud_name = var.cloud_name + plan = "startup-2" + service_name = "tms-demo-kafka" + maintenance_window_dow = "monday" + maintenance_window_time = "10:00:00" + default_acl = false + + kafka_user_config { + // Enables Kafka Schemas + schema_registry = true + kafka_version = "2.8" + kafka { + group_max_session_timeout_ms = 70000 + log_retention_bytes = 1000000000 + } + } +} + +// Kafka connect service +resource "aiven_kafka_connect" "tms-demo-kafka-connect1" { + project = var.avn_project_id + cloud_name = var.cloud_name + plan = "startup-4" + service_name = "tms-demo-kafka-connect1" + maintenance_window_dow = "monday" + maintenance_window_time = "10:00:00" + + kafka_connect_user_config { + kafka_connect { + consumer_isolation_level = "read_committed" + } + + public_access { + kafka_connect = true + } + } +} + + +// Kafka connect service integration +resource "aiven_service_integration" "tms-demo-connect-integr" { + project = var.avn_project_id + integration_type = "kafka_connect" + source_service_name = aiven_kafka.tms-demo-kafka.service_name + destination_service_name = aiven_kafka_connect.tms-demo-kafka-connect1.service_name + + kafka_connect_user_config { + kafka_connect { + group_id = "connect_1" + status_storage_topic = "__connect_1_status" + offset_storage_topic = "__connect_1_offsets" + config_storage_topic = "__connect_1_configs" + } + } + + depends_on = [ + aiven_kafka.tms-demo-kafka, + aiven_kafka_connect.tms-demo-kafka-connect1 + ] +} + +resource "aiven_service_integration" "tms-demo-obs-kafka-integr" { + project = var.avn_project_id + integration_type = "metrics" + source_service_name = aiven_kafka.tms-demo-kafka.service_name + destination_service_name = aiven_m3db.tms-demo-obs-m3db.service_name +} + diff --git a/solutions/iot/weather-stations/infra/kafka_topic.tf b/solutions/iot/weather-stations/infra/kafka_topic.tf new file mode 100644 index 0000000..6151e35 --- /dev/null +++ b/solutions/iot/weather-stations/infra/kafka_topic.tf @@ -0,0 +1,128 @@ +# Kafka topics + +resource "aiven_kafka_topic" "observations-weather-raw" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + topic_name = "observations.weather.raw" + partitions = 20 + replication = 2 + config { + retention_ms = 259200000 + cleanup_policy = "delete" + min_insync_replicas = 1 + } + depends_on = [ + aiven_kafka.tms-demo-kafka + ] + +} + +resource "aiven_kafka_topic" "observations-weather-processed" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + topic_name = "observations.weather.processed" + partitions = 20 + replication = 2 + config { + retention_ms = 259200000 + cleanup_policy = "delete" + min_insync_replicas = 1 + } + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +resource "aiven_kafka_topic" "observations-weather-multivariate" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + topic_name = "observations.weather.multivariate" + partitions = 20 + replication = 2 + config { + retention_ms = 259200000 + cleanup_policy = "delete" + min_insync_replicas = 1 + } + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +resource "aiven_kafka_topic" "observations-weather-municipality" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + topic_name = "observations.weather.municipality" + partitions = 20 + replication = 2 + config { + retention_ms = 1814400000 + cleanup_policy = "delete" + min_insync_replicas = 1 + } + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +resource "aiven_kafka_topic" "observations-weather-avg-air-temperature" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + topic_name = "observations.weather.avg-air-temperature" + partitions = 20 + replication = 2 + config { + retention_ms = 1814400000 + cleanup_policy = "delete" + min_insync_replicas = 1 + } + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +resource "aiven_kafka_topic" "stations-weather" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + topic_name = "tms-demo-pg.public.weather_stations" + partitions = 20 + replication = 2 + config { + cleanup_policy = "compact" + min_insync_replicas = 1 + } + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +resource "aiven_kafka_topic" "stations-weather-2" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + topic_name = "weather_stations" + partitions = 20 + replication = 2 + config { + cleanup_policy = "compact" + min_insync_replicas = 1 + } + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + +resource "aiven_kafka_topic" "sensors-weather" { + project = var.avn_project_id + service_name = aiven_kafka.tms-demo-kafka.service_name + topic_name = "tms-demo-pg.public.weather_sensors" + partitions = 20 + replication = 2 + config { + cleanup_policy = "compact" + min_insync_replicas = 1 + } + depends_on = [ + aiven_kafka.tms-demo-kafka + ] +} + diff --git a/solutions/iot/weather-stations/infra/m3.tf b/solutions/iot/weather-stations/infra/m3.tf new file mode 100644 index 0000000..0f5240f --- /dev/null +++ b/solutions/iot/weather-stations/infra/m3.tf @@ -0,0 +1,75 @@ +resource "aiven_m3db" "tms-demo-iot-m3db" { + project = var.avn_project_id + cloud_name = var.cloud_name + plan = "startup-8" + service_name = "tms-demo-iot-m3db" + + m3db_user_config { + + namespaces { + name = "observations" + type = "unaggregated" + options { + retention_options { + retention_period_duration = "1h" + } + } + } + } +} + +resource "aiven_m3db" "tms-demo-obs-m3db" { + project = var.avn_project_id + cloud_name = var.cloud_name + plan = "business-8" + service_name = "tms-demo-obs-m3db" + + m3db_user_config { + + namespaces { + name = "metrics" + type = "unaggregated" + options { + retention_options { + retention_period_duration = "30d" + } + } + } + + namespaces { + name = "metrics_1h" + type = "aggregated" + resolution = "1h" + options { + retention_options { + retention_period_duration = "356d" + } + } + } + } +} + +resource "aiven_m3aggregator" "tms-demo-m3a" { + project = var.avn_project_id + cloud_name = var.cloud_name + plan = "business-8" + service_name = "tms-demo-m3a" + maintenance_window_dow = "monday" + maintenance_window_time = "10:00:00" + +} + +resource "aiven_service_integration" "tms-demo-obs-m3-integr" { + project = var.avn_project_id + integration_type = "m3aggregator" + source_service_name = aiven_m3db.tms-demo-obs-m3db.service_name + destination_service_name = aiven_m3aggregator.tms-demo-m3a.service_name +} + +output "m3db_iot_host" { + value = aiven_m3db.tms-demo-iot-m3db.service_host +} + +output "m3db_obs_host" { + value = aiven_m3db.tms-demo-obs-m3db.service_host +} \ No newline at end of file diff --git a/solutions/iot/weather-stations/infra/main.tf b/solutions/iot/weather-stations/infra/main.tf new file mode 100644 index 0000000..5a6d8b2 --- /dev/null +++ b/solutions/iot/weather-stations/infra/main.tf @@ -0,0 +1,13 @@ + +terraform { + required_providers { + aiven = { + source = "aiven/aiven" + version = "2.3.2" + } + } +} + +provider "aiven" { + api_token = var.avn_api_token +} diff --git a/solutions/iot/weather-stations/infra/opensearch.tf b/solutions/iot/weather-stations/infra/opensearch.tf new file mode 100644 index 0000000..fc6ea6a --- /dev/null +++ b/solutions/iot/weather-stations/infra/opensearch.tf @@ -0,0 +1,16 @@ +# Opensearch service +resource "aiven_opensearch" "tms-demo-os" { + project = var.avn_project_id + cloud_name = var.cloud_name + plan = "startup-4" + service_name = "tms-demo-os" + maintenance_window_dow = "monday" + maintenance_window_time = "10:00:00" +} + +# Opensearch user +resource "aiven_service_user" "os-user" { + project = var.avn_project_id + service_name = aiven_opensearch.tms-demo-os.service_name + username = "test-user1" +} diff --git a/solutions/iot/weather-stations/infra/pg.tf b/solutions/iot/weather-stations/infra/pg.tf new file mode 100644 index 0000000..5952326 --- /dev/null +++ b/solutions/iot/weather-stations/infra/pg.tf @@ -0,0 +1,18 @@ +resource "aiven_pg" "tms-demo-pg" { + project = var.avn_project_id + cloud_name = var.cloud_name + plan = "startup-4" + service_name = "tms-demo-pg" +} + +data "aiven_service_user" "pg_admin" { + project = var.avn_project_id + service_name = aiven_pg.tms-demo-pg.service_name + + # default admin user that is automatically created each Aiven service + username = "avnadmin" + + depends_on = [ + aiven_pg.tms-demo-pg + ] +} \ No newline at end of file diff --git a/solutions/iot/weather-stations/infra/secrets.tfvars.template b/solutions/iot/weather-stations/infra/secrets.tfvars.template new file mode 100644 index 0000000..caa6d3c --- /dev/null +++ b/solutions/iot/weather-stations/infra/secrets.tfvars.template @@ -0,0 +1,5 @@ +avn_project_id = "" +avn_api_token = "" +cloud_name = "google-europe-north1" +bq_project = "" +bq_key = "" diff --git a/solutions/iot/weather-stations/infra/variables.tf b/solutions/iot/weather-stations/infra/variables.tf new file mode 100644 index 0000000..a7321df --- /dev/null +++ b/solutions/iot/weather-stations/infra/variables.tf @@ -0,0 +1,7 @@ +variable "avn_project_id" {} +variable "avn_api_token" {} +variable "cloud_name" {} + +variable "bq_project" {} + +variable "bq_key" {} \ No newline at end of file diff --git a/solutions/iot/weather-stations/k8s/create-k8s-secrets.sh b/solutions/iot/weather-stations/k8s/create-k8s-secrets.sh new file mode 100755 index 0000000..940c8b3 --- /dev/null +++ b/solutions/iot/weather-stations/k8s/create-k8s-secrets.sh @@ -0,0 +1,36 @@ +#!/bin/sh +kubectl create secret generic tms-ingest-cert \ +--from-file=secrets/aiven/ingest/service.cert \ +--from-file=secrets/aiven/ingest/service.key \ +--from-file=secrets/aiven/ingest/ca.pem \ +--from-file=secrets/aiven/ingest/client.keystore.p12 \ +--from-file=secrets/aiven/ingest/client.truststore.jks \ +-n tms-demo +kubectl create secret generic tms-processing-cert \ +--from-file=secrets/aiven/processing/service.cert \ +--from-file=secrets/aiven/processing/service.key \ +--from-file=secrets/aiven/processing/ca.pem \ +--from-file=secrets/aiven/processing/client.keystore.p12 \ +--from-file=secrets/aiven/processing/client.truststore.jks \ +--from-file=secrets/aiven/schema_registry.truststore.jks \ +-n tms-demo +kubectl create secret generic tms-sink-cert \ +--from-file=secrets/aiven/sink/service.cert \ +--from-file=secrets/aiven/sink/service.key \ +--from-file=secrets/aiven/sink/ca.pem \ +--from-file=secrets/aiven/sink/client.keystore.p12 \ +--from-file=secrets/aiven/sink/client.truststore.jks \ +-n tms-demo +kubectl create secret generic tms-service-endpoint \ +--from-file=BOOTSTRAP_SERVERS=secrets/aiven/kafka_service_uri \ +--from-file=SCHEMA_REGISTRY=secrets/aiven/schema_registry_uri \ +--from-file=M3_INFLUXDB_URL=secrets/aiven/m3_influxdb_uri \ +--from-file=M3_INFLUXDB_CREDENTIALS=secrets/aiven/m3_credentials \ +-n tms-demo +kubectl create secret generic tms-os-service \ +--from-file=OPENSEARCH_HOST=secrets/aiven/os_host \ +--from-file=OPENSEARCH_PORT=secrets/aiven/os_port \ +--from-file=OPENSEARCH_USER=secrets/aiven/os_user \ +--from-file=OPENSEARCH_PASSWORD=secrets/aiven/os_password \ +-n tms-demo +kubectl create -f secrets.yaml diff --git a/solutions/iot/weather-stations/k8s/deploy-ingest.yaml b/solutions/iot/weather-stations/k8s/deploy-ingest.yaml new file mode 100644 index 0000000..3cfdbb3 --- /dev/null +++ b/solutions/iot/weather-stations/k8s/deploy-ingest.yaml @@ -0,0 +1,68 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tms-demo-ingest + namespace: tms-demo + labels: + app: tms-demo-ingest +spec: + replicas: 1 + selector: + matchLabels: + app: tms-demo-ingest + template: + metadata: + labels: + app: tms-demo-ingest + spec: + containers: + - name: tms-demo-ingest + image: ssaily/tms-demo-ingest:0.1.2 + env: + - name: MQTT_USER + value: digitraffic + - name: MQTT_PASSWORD + value: digitrafficPassword + - name: PYTHONUNBUFFERED + value: "1" + envFrom: + - secretRef: + name: tms-service-endpoint + volumeMounts: + - mountPath: /etc/streams/tms-ingest-cert + name: tms-ingest-cert + resources: + limits: + memory: 512Mi + cpu: "1" + requests: + memory: 256Mi + cpu: "100m" + imagePullPolicy: Always + volumes: + - name: tms-ingest-cert + secret: + secretName: tms-ingest-cert + items: + - key: service.cert + path: service.cert + - key: service.key + path: service.key + - key: ca.pem + path: ca.pem +--- +apiVersion: v1 +kind: Service +metadata: + name: tms-demo-ingest-svc + namespace: tms-demo + labels: + app: tms-demo-ingest +spec: + ports: + - name: metrics + port: 9091 + targetPort: 9091 + protocol: TCP + selector: + app: tms-demo-ingest diff --git a/solutions/iot/weather-stations/k8s/deploy-processing.yaml b/solutions/iot/weather-stations/k8s/deploy-processing.yaml new file mode 100644 index 0000000..b46ef49 --- /dev/null +++ b/solutions/iot/weather-stations/k8s/deploy-processing.yaml @@ -0,0 +1,201 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tms-demo-enrichment + namespace: tms-demo + labels: + app: tms-demo-enrichment +spec: + replicas: 1 + selector: + matchLabels: + app: tms-demo-enrichment + template: + metadata: + labels: + app: tms-demo-enrichment + spec: + containers: + - name: tms-demo-enrichment + image: ssaily/tms-demo-processing:0.4.5 + env: + - name: SPRING_PROFILES_ACTIVE + value: enrichment + - name: JAVA_OPTS + value: "-javaagent:./jmx_prometheus_javaagent-0.16.1.jar=9091:kafka-client.yaml" + envFrom: + - secretRef: + name: kafka-secrets + - secretRef: + name: tms-service-endpoint + volumeMounts: + - mountPath: /etc/streams/tms-processing-cert + name: cert-volume + resources: + limits: + memory: 512Mi + cpu: "1" + requests: + memory: 256Mi + cpu: "100m" + imagePullPolicy: Always + ports: + - name: metrics + containerPort: 9091 + volumes: + - name: cert-volume + secret: + secretName: tms-processing-cert + items: + - key: client.keystore.p12 + path: client.keystore.p12 + - key: client.truststore.jks + path: client.truststore.jks +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tms-demo-calculations + namespace: tms-demo + labels: + app: tms-demo-calculations +spec: + replicas: 1 + selector: + matchLabels: + app: tms-demo-calculations + template: + metadata: + labels: + app: tms-demo-calculations + spec: + containers: + - name: tms-demo-calculations + image: ssaily/tms-demo-processing:0.4.5 + env: + - name: SPRING_PROFILES_ACTIVE + value: calculations + - name: JAVA_OPTS + value: "-javaagent:./jmx_prometheus_javaagent-0.16.1.jar=9091:kafka-client.yaml" + envFrom: + - secretRef: + name: kafka-secrets + - secretRef: + name: tms-service-endpoint + volumeMounts: + - mountPath: /etc/streams/tms-processing-cert + name: tms-processing-cert + resources: + limits: + memory: 512Mi + cpu: "1" + requests: + memory: 256Mi + cpu: "100m" + imagePullPolicy: Always + volumes: + - name: tms-processing-cert + secret: + secretName: tms-processing-cert + items: + - key: client.keystore.p12 + path: client.keystore.p12 + - key: client.truststore.jks + path: client.truststore.jks +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tms-demo-multivariate + namespace: tms-demo + labels: + app: tms-demo-multivariate +spec: + replicas: 1 + selector: + matchLabels: + app: tms-demo-multivariate + template: + metadata: + labels: + app: tms-demo-multivariate + spec: + containers: + - name: tms-demo-multivariate + image: ssaily/tms-demo-processing:0.4.5 + env: + - name: SPRING_PROFILES_ACTIVE + value: multivariate + - name: JAVA_OPTS + value: "-javaagent:./jmx_prometheus_javaagent-0.16.1.jar=9091:kafka-client.yaml" + envFrom: + - secretRef: + name: kafka-secrets + - secretRef: + name: tms-service-endpoint + volumeMounts: + - mountPath: /etc/streams/tms-processing-cert + name: tms-processing-cert + resources: + limits: + memory: 1024Mi + cpu: "2" + requests: + memory: 256Mi + cpu: "100m" + imagePullPolicy: Always + volumes: + - name: tms-processing-cert + secret: + secretName: tms-processing-cert + items: + - key: client.keystore.p12 + path: client.keystore.p12 + - key: client.truststore.jks + path: client.truststore.jks +--- +apiVersion: v1 +kind: Service +metadata: + name: tms-demo-enrichment-svc + namespace: tms-demo + labels: + app: tms-demo-enrichment +spec: + ports: + - name: metrics + port: 9091 + selector: + app: tms-demo-enrichment +--- +apiVersion: v1 +kind: Service +metadata: + name: tms-demo-calculations-svc + namespace: tms-demo + labels: + app: tms-demo-calculations +spec: + ports: + - name: metrics + port: 9091 + targetPort: 9091 + protocol: TCP + selector: + app: tms-demo-calculations +--- +apiVersion: v1 +kind: Service +metadata: + name: tms-demo-multivariate-svc + namespace: tms-demo + labels: + app: tms-demo-multivariate +spec: + ports: + - name: metrics + port: 9091 + targetPort: 9091 + protocol: TCP + selector: + app: tms-demo-multivariate diff --git a/solutions/iot/weather-stations/k8s/deploy-sink.yaml b/solutions/iot/weather-stations/k8s/deploy-sink.yaml new file mode 100644 index 0000000..7b60c30 --- /dev/null +++ b/solutions/iot/weather-stations/k8s/deploy-sink.yaml @@ -0,0 +1,66 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tms-demo-sink + namespace: tms-demo + labels: + app: tms-demo-sink +spec: + replicas: 1 + selector: + matchLabels: + app: tms-demo-sink + template: + metadata: + labels: + app: tms-demo-sink + spec: + containers: + - name: tms-demo-sink + image: ssaily/tms-demo-sink:0.1.3 + env: + - name: PYTHONUNBUFFERED + value: "1" + envFrom: + - secretRef: + name: kafka-secrets + - secretRef: + name: tms-service-endpoint + volumeMounts: + - mountPath: /etc/streams/tms-sink-cert + name: tms-sink-cert + resources: + limits: + memory: 512Mi + cpu: "1" + requests: + memory: 256Mi + cpu: "100m" + imagePullPolicy: Always + volumes: + - name: tms-sink-cert + secret: + secretName: tms-sink-cert + items: + - key: service.cert + path: service.cert + - key: service.key + path: service.key + - key: ca.pem + path: ca.pem +--- +apiVersion: v1 +kind: Service +metadata: + name: tms-demo-sink-svc + namespace: tms-demo + labels: + app: tms-demo-sink +spec: + ports: + - name: metrics + port: 9091 + targetPort: 9091 + protocol: TCP + selector: + app: tms-demo-sink diff --git a/solutions/iot/weather-stations/k8s/ksqldb.yaml b/solutions/iot/weather-stations/k8s/ksqldb.yaml new file mode 100644 index 0000000..81e9501 --- /dev/null +++ b/solutions/iot/weather-stations/k8s/ksqldb.yaml @@ -0,0 +1,99 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tms-demo-ksqldb + namespace: tms-demo + labels: + app: tms-demo-ksqldb +spec: + replicas: 3 + selector: + matchLabels: + app: tms-demo-ksqldb + template: + metadata: + labels: + app: tms-demo-ksqldb + spec: + containers: + - name: tms-demo-ksqldb + image: confluentinc/ksqldb-server:0.20.0 + envFrom: + - secretRef: + name: kafka-secrets + - secretRef: + name: tms-service-endpoint + env: + - name: KSQL_BOOTSTRAP_SERVERS + value: $(BOOTSTRAP_SERVERS) + - name: KSQL_LISTENERS + value: http://0.0.0.0:8088/ + - name: KSQL_KSQL_SERVICE_ID + value: tms_demo_ksqldb_ + - name: KSQL_KSQL_SINK_REPLICAS + value: "3" + - name: KSQL_KSQL_STREAMS_REPLICATION_FACTOR + value: "3" + - name: KSQL_KSQL_INTERNAL_TOPIC_REPLICAS + value: "3" + - name: KSQL_SECURITY_PROTOCOL + value: SSL + - name: KSQL_SSL_KEYSTORE_TYPE + value: PKCS12 + - name: KSQL_SSL_KEYSTORE_LOCATION + value: /etc/streams/tms-processing-cert/client.keystore.p12 + - name: KSQL_SSL_KEYSTORE_PASSWORD + value: $(keystore-password) + - name: KSQL_SSL_KEY_PASSWORD + value: $(key-password) + - name: KSQL_SSL_TRUSTSTORE_TYPE + value: JKS + - name: KSQL_SSL_TRUSTSTORE_LOCATION + value: /etc/streams/tms-processing-cert/client.truststore.jks + - name: KSQL_SSL_TRUSTSTORE_PASSWORD + value: $(truststore-password) + - name: KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE + value: URL + - name: KSQL_KSQL_SCHEMA_REGISTRY_URL + value: $(SCHEMA_REGISTRY) + - name: KSQL_CONFLUENT_SUPPORT_METRICS_ENABLE + value: "false" + - name: KSQL_KSQL_SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION + value: /etc/streams/tms-processing-cert/schema_registry.truststore.jks + - name: KSQL_KSQL_SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD + value: $(truststore-password) + volumeMounts: + - mountPath: /etc/streams/tms-processing-cert + name: tms-processing-cert + resources: + limits: + memory: 512Mi + cpu: "1" + requests: + memory: 256Mi + cpu: "100m" + imagePullPolicy: Always + volumes: + - name: tms-processing-cert + secret: + secretName: tms-processing-cert + items: + - key: client.keystore.p12 + path: client.keystore.p12 + - key: client.truststore.jks + path: client.truststore.jks + - key: schema_registry.truststore.jks + path: schema_registry.truststore.jks +--- +apiVersion: v1 +kind: Service +metadata: + name: ksqldb-server + namespace: tms-demo +spec: + selector: + app: tms-demo-ksqldb + ports: + - protocol: TCP + port: 8088 + targetPort: 8088 diff --git a/solutions/iot/weather-stations/k8s/kustomization.yaml b/solutions/iot/weather-stations/k8s/kustomization.yaml new file mode 100644 index 0000000..c40590a --- /dev/null +++ b/solutions/iot/weather-stations/k8s/kustomization.yaml @@ -0,0 +1,27 @@ +resources: +- deploy-ingest.yaml +- deploy-processing.yaml +- deploy-sink.yaml +secretGenerator: +- name: tms-ingest-cert + namespace: tms-demo + files: + - secrets/aiven/ingest/service.cert + - secrets/aiven/ingest/service.key + - secrets/aiven/ingest/ca.pem + - secrets/aiven/ingest/client.keystore.p12 + - secrets/aiven/ingest/client.truststore.jks +- name: tms-processing-cert + files: + - secrets/aiven/processing/service.cert + - secrets/aiven/processing/service.key + - secrets/aiven/processing/ca.pem + - secrets/aiven/processing/client.keystore.p12 + - secrets/aiven/processing/client.truststore.jks +- name: tms-sink-cert + files: + - secrets/aiven/sink/service.cert + - secrets/aiven/sink/service.key + - secrets/aiven/sink/ca.pem + - secrets/aiven/sink/client.keystore.p12 + - secrets/aiven/sink/client.truststore.jks diff --git a/solutions/iot/weather-stations/k8s/namespace.yaml b/solutions/iot/weather-stations/k8s/namespace.yaml new file mode 100644 index 0000000..f2b2026 --- /dev/null +++ b/solutions/iot/weather-stations/k8s/namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: tms-demo diff --git a/solutions/iot/weather-stations/k8s/secrets.yaml b/solutions/iot/weather-stations/k8s/secrets.yaml new file mode 100644 index 0000000..07a5c4a --- /dev/null +++ b/solutions/iot/weather-stations/k8s/secrets.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: Secret +metadata: + name: kafka-secrets + namespace: tms-demo +data: + keystore-password: c3VwZXJzZWNyZXQ= + truststore-password: c3VwZXJzZWNyZXQ= + key-password: c3VwZXJzZWNyZXQ= diff --git a/solutions/iot/weather-stations/m3-sink/Dockerfile b/solutions/iot/weather-stations/m3-sink/Dockerfile new file mode 100644 index 0000000..aad43f4 --- /dev/null +++ b/solutions/iot/weather-stations/m3-sink/Dockerfile @@ -0,0 +1,17 @@ +# set base image (host OS) +FROM python:3.8 + +# set the working directory in the container +WORKDIR /code + +# copy the dependencies file to the working directory +COPY requirements.txt . + +# install dependencies +RUN pip install -r requirements.txt + +# copy the content of the local src directory to the working directory +COPY src/ . + +# command to run on container start +CMD [ "python", "./sink.py" ] \ No newline at end of file diff --git a/solutions/iot/weather-stations/m3-sink/README.md b/solutions/iot/weather-stations/m3-sink/README.md new file mode 100644 index 0000000..f7f5241 --- /dev/null +++ b/solutions/iot/weather-stations/m3-sink/README.md @@ -0,0 +1,3 @@ +# M3DB Sink + +This is a small Python Kafka Consumer which will consume messages from Kafka and build InfluxDB Line Format payloads. Payload batch is then sent to M3DB InfluxDB compatible HTTP endpoint. \ No newline at end of file diff --git a/solutions/iot/weather-stations/m3-sink/requirements.txt b/solutions/iot/weather-stations/m3-sink/requirements.txt new file mode 100644 index 0000000..566a438 --- /dev/null +++ b/solutions/iot/weather-stations/m3-sink/requirements.txt @@ -0,0 +1,4 @@ +confluent_kafka[avro]==1.4.0 +requests +prometheus-kafka-metrics +prometheus-client diff --git a/solutions/iot/weather-stations/m3-sink/src/sink.py b/solutions/iot/weather-stations/m3-sink/src/sink.py new file mode 100644 index 0000000..3bb84c2 --- /dev/null +++ b/solutions/iot/weather-stations/m3-sink/src/sink.py @@ -0,0 +1,114 @@ +from confluent_kafka import avro +from confluent_kafka import Consumer +from confluent_kafka.avro import AvroConsumer, CachedSchemaRegistryClient +from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerde +from prometheus_kafka_consumer.metrics_manager import ConsumerMetricsManager +from prometheus_client import start_http_server + +import requests +import unicodedata +import os + +metric_manager = ConsumerMetricsManager() +influxdb_client = os.getenv("M3_INFLUXDB_URL").rstrip() +influxdb_cred = os.getenv("M3_INFLUXDB_CREDENTIALS").rstrip() +group_name = "tms-demo-m3-sink" + +consumer_config = {"bootstrap.servers": os.getenv("BOOTSTRAP_SERVERS"), + "statistics.interval.ms": 10000, + "group.id": group_name, + "client.id": group_name, + "stats_cb": metric_manager.send, + "max.poll.interval.ms": 30000, + "session.timeout.ms": 20000, + "default.topic.config": {"auto.offset.reset": "latest"}, + "security.protocol": "SSL", + "ssl.ca.location": "/etc/streams/tms-sink-cert/ca.pem", + "ssl.certificate.location": "/etc/streams/tms-sink-cert/service.cert", + "ssl.key.location": "/etc/streams/tms-sink-cert/service.key" + } + +schema_registry_config = {"url": os.getenv("SCHEMA_REGISTRY")} +schema_registry = CachedSchemaRegistryClient(schema_registry_config) +avro_serde = AvroSerde(schema_registry) +deserialize_avro = avro_serde.decode_message + +def get_name_or_default(name): + if not name: + return str("-") + else: + name = unicodedata.normalize('NFD', name) + name = name.encode('ascii', 'ignore').decode('ascii') + name = name.replace(" ", "_") + return name + +def to_buffer(buffer: list, message): + try: + deserialized_message = deserialize_avro(message=message.value(), is_key=False) + except Exception as e: + print(f"Failed deserialize avro payload: {message.value()}\n{e}") + else: + values = [] + sensor_name_str = '' + values_str = '' + measurement_name = '' + if message.topic() == 'observations.weather.multivariate': + sensor_name_str = 'all' + measurement_name = 'observations_mv' + for key, value in deserialized_message['measurements'].items(): + values.append(key + '=' + str(value)) + values_str = ','.join(values) + else: + measurement_name = 'observations' + sensor_name_str = get_name_or_default(deserialized_message["name"]) + values_str = f'sensorValue={deserialized_message["sensorValue"]}' + + buffer.append("{measurement},service=m3-sink,roadStationId={road_station_id},municipality={municipality},province={province},geohash={geohash},name={sensor_name} {sensor_values} {timestamp}" + .format(measurement=measurement_name, + road_station_id=deserialized_message["roadStationId"], + municipality=get_name_or_default(deserialized_message["municipality"]), + province=get_name_or_default(deserialized_message["province"]), + geohash=deserialized_message["geohash"], + sensor_name=sensor_name_str, + sensor_values=values_str, + timestamp=deserialized_message["measuredTime"] * 1000 * 1000)) + +def flush_buffer(buffer: list): + print(f"Flushing {len(buffer)} records to M3") + payload = str("\n".join(buffer)) + response = requests.post(influxdb_client, data=payload, + auth=(influxdb_cred.split(":")[0],influxdb_cred.split(":")[1]), + headers={'Content-Type': 'application/x-www-form-urlencoded'}) + if response.status_code == 400: + print(f"{response.text} Skipping too old records") + buffer.clear() + return True + if response.status_code != 204: + print(f"Failed to store to M3 {response.status_code}\n{response.text}") + return False + + buffer.clear() + return True + +def consume_record(lines: list): + consumer = Consumer(consumer_config) + consumer.subscribe(["observations.weather.multivariate", "observations.weather.municipality"]) + + while True: + try: + message = consumer.poll(1) + except Exception as e: + print(f"Exception while trying to poll messages - {e}") + exit(-1) + else: + if message: + to_buffer(lines, message) + + if (len(lines) > 1000 and flush_buffer(lines) == True): + consumer.commit() + +if __name__ == "__main__": + start_http_server(9091) + lines = [] + consume_record(lines) + diff --git a/solutions/iot/weather-stations/machinelearning/multi-step-models.ipynb b/solutions/iot/weather-stations/machinelearning/multi-step-models.ipynb new file mode 100644 index 0000000..d78ca7b --- /dev/null +++ b/solutions/iot/weather-stations/machinelearning/multi-step-models.ipynb @@ -0,0 +1,748 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "continent-republican", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import datetime\n", + "from pathlib import Path\n", + "\n", + "import IPython\n", + "import IPython.display\n", + "\n", + "import pandas as pd\n", + "import seaborn as sns\n", + "import tensorflow as tf\n", + "\n", + "import matplotlib as mpl\n", + "import matplotlib.pyplot as plt\n", + "\n", + "import numpy as np\n", + "\n", + "from tqdm import tqdm" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "historical-citizen", + "metadata": {}, + "outputs": [], + "source": [ + "def wind_vector(self, velocity, max_velocity, direction):\n", + " # Convert to radians.\n", + " wd_rad = direction*np.pi / 180\n", + " self['Wx'] = velocity*np.cos(wd_rad)\n", + " self['Wy'] = velocity*np.sin(wd_rad)\n", + " self['max Wx'] = max_velocity*np.cos(wd_rad)\n", + " self['max Wy'] = max_velocity*np.sin(wd_rad)\n", + "pd.DataFrame.wind_vector = wind_vector" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "informed-perspective", + "metadata": {}, + "outputs": [], + "source": [ + "def tod_signal(self, date_time):\n", + " day = 24*60*60\n", + " year = (365.2425)*day\n", + " timestamp_s = date_time.map(datetime.datetime.timestamp)\n", + " self['Day sin'] = np.sin(timestamp_s * (2 * np.pi / day))\n", + " self['Day cos'] = np.cos(timestamp_s * (2 * np.pi / day))\n", + " self['Year sin'] = np.sin(timestamp_s * (2 * np.pi / year))\n", + " self['Year cos'] = np.cos(timestamp_s * (2 * np.pi / year))\n", + "pd.DataFrame.tod_signal = tod_signal" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "sacred-crime", + "metadata": {}, + "outputs": [], + "source": [ + "# Training set from history file\n", + "df = pd.read_csv('vantaa-2017-2020.csv.gz', compression='gzip')\n", + "df['time'] = pd.to_datetime(df[['Pv','Kk','Vuosi', 'Klo']]\n", + " .astype(str).apply(' '.join, 1), format='%d %m %Y %H:%M')\n", + "df.drop(columns=['Pv','Kk','Vuosi', 'Klo', 'Aikavyöhyke'], inplace=True)\n", + "df.index = df['time']\n", + "df.info()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "breathing-murray", + "metadata": {}, + "outputs": [], + "source": [ + "interpo = df.resample('600s').mean().interpolate()\n", + "interpo.head(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "subsequent-perspective", + "metadata": {}, + "outputs": [], + "source": [ + "wind_direction = interpo['Tuulen suunta (deg)']\n", + "wind_v = interpo['Tuulen nopeus (m/s)']\n", + "wind_v_max = interpo['Puuskanopeus (m/s)']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "greek-worcester", + "metadata": {}, + "outputs": [], + "source": [ + "plt.hist2d(wind_direction, wind_v, bins=(50, 50), vmax=400)\n", + "plt.colorbar()\n", + "plt.xlabel('Wind Direction [deg]')\n", + "plt.ylabel('Wind Velocity [m/s]')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "applicable-delivery", + "metadata": {}, + "outputs": [], + "source": [ + "# Convert to wind vector\n", + "interpo.wind_vector(interpo.pop('Tuulen nopeus (m/s)'), interpo.pop('Puuskanopeus (m/s)'), interpo.pop('Tuulen suunta (deg)'))\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "pointed-soldier", + "metadata": {}, + "outputs": [], + "source": [ + "plt.hist2d(interpo['Wx'], interpo['Wy'], bins=(50, 50), vmax=400)\n", + "plt.colorbar()\n", + "plt.xlabel('Wind X [m/s]')\n", + "plt.ylabel('Wind Y [m/s]')\n", + "ax = plt.gca()\n", + "ax.axis('tight')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "french-verse", + "metadata": {}, + "outputs": [], + "source": [ + "# look for seasonality although we know that year and day will be important features for weather data\n", + "fft = tf.signal.rfft(interpo['Ilman lämpötila (degC)'])\n", + "f_per_dataset = np.arange(0, len(fft))\n", + "\n", + "n_samples_h = len(interpo['Ilman lämpötila (degC)'])\n", + "samples_per_year = 6*24*365.2524 # 10m sampling rate\n", + "years_per_dataset = n_samples_h/(samples_per_year)\n", + "\n", + "f_per_year = f_per_dataset/years_per_dataset\n", + "plt.step(f_per_year, np.abs(fft))\n", + "plt.xscale('log')\n", + "plt.ylim(0, 2000000)\n", + "plt.xlim([0.1, max(plt.xlim())])\n", + "plt.xticks([1, 365.2524], labels=['1/Year', '1/day'])\n", + "_ = plt.xlabel('Frequency (log scale)')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "center-gothic", + "metadata": {}, + "outputs": [], + "source": [ + "# calculate time of day signal from time index\n", + "interpo.tod_signal(interpo.index.get_level_values('time'))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "pleasant-battle", + "metadata": {}, + "outputs": [], + "source": [ + "plt.plot(np.array(interpo['Day sin'])[:25*6])\n", + "plt.plot(np.array(interpo['Day cos'])[:25*6])\n", + "plt.xlabel('Time step')\n", + "plt.title('Time of day signal')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "considerable-turtle", + "metadata": {}, + "outputs": [], + "source": [ + "# Split dataset\n", + "column_indices = {name: i for i, name in enumerate(interpo.columns)}\n", + "\n", + "n = len(interpo)\n", + "train_df = interpo[0:int(n*0.7)]\n", + "val_df = interpo[int(n*0.7):int(n*0.9)]\n", + "test_df = interpo[int(n*0.9):]\n", + "\n", + "num_features = interpo.shape[1]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "necessary-provision", + "metadata": {}, + "outputs": [], + "source": [ + "# Normalize\n", + "train_mean = train_df.mean()\n", + "train_std = train_df.std()\n", + "train_df = (train_df - train_mean) / train_std\n", + "val_df = (val_df - train_mean) / train_std\n", + "test_df = (test_df - train_mean) / train_std" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "controversial-advance", + "metadata": {}, + "outputs": [], + "source": [ + "# store values used for normalization so that we can use them on predicitions\n", + "out_path = Path('predict')\n", + "out_path.mkdir(parents=True, exist_ok=True)\n", + "train_mean.to_pickle('predict/train_mean.pkl')\n", + "train_std.to_pickle('predict/train_std.pkl')\n", + "pd.DataFrame(train_df.columns).to_csv('predict/trainset_columns.csv')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "latter-playing", + "metadata": {}, + "outputs": [], + "source": [ + "df_std = (interpo - train_mean) / train_std\n", + "df_std = df_std.melt(var_name='Column', value_name='Normalized')\n", + "plt.figure(figsize=(12, 6))\n", + "ax = sns.violinplot(x='Column', y='Normalized', data=df_std)\n", + "_ = ax.set_xticklabels(interpo.keys(), rotation=90)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "quarterly-interview", + "metadata": {}, + "outputs": [], + "source": [ + "class WindowGenerator():\n", + " def __init__(self, input_width, label_width, shift,\n", + " train_df=train_df, val_df=val_df, test_df=test_df,\n", + " label_columns=None):\n", + " # Store the raw data.\n", + " self.train_df = train_df\n", + " self.val_df = val_df\n", + " self.test_df = test_df\n", + "\n", + " # Work out the label column indices.\n", + " self.label_columns = label_columns\n", + " if label_columns is not None:\n", + " self.label_columns_indices = {name: i for i, name in\n", + " enumerate(label_columns)}\n", + " self.column_indices = {name: i for i, name in\n", + " enumerate(train_df.columns)}\n", + "\n", + " # Work out the window parameters.\n", + " self.input_width = input_width\n", + " self.label_width = label_width\n", + " self.shift = shift\n", + "\n", + " self.total_window_size = input_width + shift\n", + "\n", + " self.input_slice = slice(0, input_width)\n", + " self.input_indices = np.arange(self.total_window_size)[self.input_slice]\n", + "\n", + " self.label_start = self.total_window_size - self.label_width\n", + " self.labels_slice = slice(self.label_start, None)\n", + " self.label_indices = np.arange(self.total_window_size)[self.labels_slice]\n", + "\n", + " def __repr__(self):\n", + " return '\\n'.join([\n", + " f'Total window size: {self.total_window_size}',\n", + " f'Input indices: {self.input_indices}',\n", + " f'Label indices: {self.label_indices}',\n", + " f'Label column name(s): {self.label_columns}'])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "physical-mozambique", + "metadata": {}, + "outputs": [], + "source": [ + "def split_window(self, features):\n", + " inputs = features[:, self.input_slice, :]\n", + " labels = features[:, self.labels_slice, :]\n", + " if self.label_columns is not None:\n", + " labels = tf.stack(\n", + " [labels[:, :, self.column_indices[name]] for name in self.label_columns],\n", + " axis=-1)\n", + "\n", + " # Slicing doesn't preserve static shape information, so set the shapes\n", + " # manually. This way the `tf.data.Datasets` are easier to inspect.\n", + " inputs.set_shape([None, self.input_width, None])\n", + " labels.set_shape([None, self.label_width, None])\n", + "\n", + " return inputs, labels\n", + "\n", + "WindowGenerator.split_window = split_window" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "mighty-simpson", + "metadata": {}, + "outputs": [], + "source": [ + "def plot(self, model=None, plot_col='Ilman lämpötila (degC)', max_subplots=3):\n", + " inputs, labels = self.example\n", + " plt.figure(figsize=(12, 8))\n", + " plot_col_index = self.column_indices[plot_col]\n", + " max_n = min(max_subplots, len(inputs))\n", + " for n in range(max_n):\n", + " plt.subplot(3, 1, n+1)\n", + " plt.ylabel(f'{plot_col} [normed]')\n", + " plt.plot(self.input_indices, inputs[n, :, plot_col_index],\n", + " label='Inputs', marker='.', zorder=-10)\n", + "\n", + " if self.label_columns:\n", + " label_col_index = self.label_columns_indices.get(plot_col, None)\n", + " else:\n", + " label_col_index = plot_col_index\n", + "\n", + " if label_col_index is None:\n", + " continue\n", + "\n", + " plt.scatter(self.label_indices, labels[n, :, label_col_index],\n", + " edgecolors='k', label='Labels', c='#2ca02c', s=64)\n", + " if model is not None:\n", + " predictions = model(inputs)\n", + " plt.scatter(self.label_indices, predictions[n, :, label_col_index],\n", + " marker='X', edgecolors='k', label='Predictions',\n", + " c='#ff7f0e', s=64)\n", + "\n", + " if n == 0:\n", + " plt.legend()\n", + "\n", + " plt.xlabel('Time steps')\n", + "\n", + "WindowGenerator.plot = plot" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "framed-integral", + "metadata": {}, + "outputs": [], + "source": [ + "def make_dataset(self, data):\n", + " data = np.array(data, dtype=np.float32)\n", + " ds = tf.keras.preprocessing.timeseries_dataset_from_array(\n", + " data=data,\n", + " targets=None,\n", + " sequence_length=self.total_window_size,\n", + " sequence_stride=1,\n", + " shuffle=True,\n", + " batch_size=32,)\n", + "\n", + " ds = ds.map(self.split_window)\n", + "\n", + " return ds\n", + "\n", + "WindowGenerator.make_dataset = make_dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "known-contribution", + "metadata": {}, + "outputs": [], + "source": [ + "@property\n", + "def train(self):\n", + " return self.make_dataset(self.train_df)\n", + "\n", + "@property\n", + "def val(self):\n", + " return self.make_dataset(self.val_df)\n", + "\n", + "@property\n", + "def test(self):\n", + " return self.make_dataset(self.test_df)\n", + "\n", + "@property\n", + "def example(self):\n", + " \"\"\"Get and cache an example batch of `inputs, labels` for plotting.\"\"\"\n", + " result = getattr(self, '_example', None)\n", + " if result is None:\n", + " # No example batch was found, so get one from the `.train` dataset\n", + " result = next(iter(self.train))\n", + " # And cache it for next time\n", + " self._example = result\n", + " return result\n", + "\n", + "WindowGenerator.train = train\n", + "WindowGenerator.val = val\n", + "WindowGenerator.test = test\n", + "WindowGenerator.example = example" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "certain-georgia", + "metadata": {}, + "outputs": [], + "source": [ + "MAX_EPOCHS = 20\n", + "\n", + "def compile_and_fit(model, window, patience=2):\n", + " early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss',\n", + " patience=patience,\n", + " mode='min')\n", + "\n", + " model.compile(loss=tf.losses.MeanSquaredError(),\n", + " optimizer=tf.optimizers.Adam(),\n", + " metrics=[tf.metrics.MeanAbsoluteError()])\n", + "\n", + " history = model.fit(window.train, epochs=MAX_EPOCHS,\n", + " validation_data=window.val,\n", + " callbacks=[early_stopping])\n", + " return history" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "small-mechanics", + "metadata": {}, + "outputs": [], + "source": [ + "# Now that we have defined the functions let's try multi step models to predict next 24h using on 24h history\n", + "val_performance = {}\n", + "performance = {}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "stupid-medium", + "metadata": {}, + "outputs": [], + "source": [ + "OUT_STEPS = 6*24\n", + "multi_window = WindowGenerator(input_width=6*24,\n", + " label_width=OUT_STEPS,\n", + " shift=OUT_STEPS)\n", + "\n", + "multi_window.plot()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "linear-registration", + "metadata": {}, + "outputs": [], + "source": [ + "class MultiStepLastBaseline(tf.keras.Model):\n", + " def call(self, inputs):\n", + " return tf.tile(inputs[:, -1:, :], [1, OUT_STEPS, 1])\n", + "\n", + "last_baseline = MultiStepLastBaseline()\n", + "last_baseline.compile(loss=tf.losses.MeanSquaredError(),\n", + " metrics=[tf.metrics.MeanAbsoluteError()])\n", + "\n", + "multi_val_performance = {}\n", + "multi_performance = {}\n", + "\n", + "multi_val_performance['Last'] = last_baseline.evaluate(multi_window.val)\n", + "multi_performance['Last'] = last_baseline.evaluate(multi_window.test, verbose=0)\n", + "multi_window.plot(last_baseline)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "functional-communication", + "metadata": {}, + "outputs": [], + "source": [ + "# this baseline repeats the previous day assuming tomorrow is same as today\n", + "class RepeatBaseline(tf.keras.Model):\n", + " def call(self, inputs):\n", + " return inputs\n", + "\n", + "repeat_baseline = RepeatBaseline()\n", + "repeat_baseline.compile(loss=tf.losses.MeanSquaredError(),\n", + " metrics=[tf.metrics.MeanAbsoluteError()])\n", + "\n", + "multi_val_performance['Repeat'] = repeat_baseline.evaluate(multi_window.val)\n", + "multi_performance['Repeat'] = repeat_baseline.evaluate(multi_window.test, verbose=0)\n", + "multi_window.plot(repeat_baseline)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "smaller-parker", + "metadata": {}, + "outputs": [], + "source": [ + "multi_linear_model = tf.keras.Sequential([\n", + " # Take the last time-step.\n", + " # Shape [batch, time, features] => [batch, 1, features]\n", + " tf.keras.layers.Lambda(lambda x: x[:, -1:, :]),\n", + " # Shape => [batch, 1, out_steps*features]\n", + " tf.keras.layers.Dense(OUT_STEPS*num_features,\n", + " kernel_initializer=tf.initializers.zeros),\n", + " # Shape => [batch, out_steps, features]\n", + " tf.keras.layers.Reshape([OUT_STEPS, num_features])\n", + "])\n", + "\n", + "history = compile_and_fit(multi_linear_model, multi_window)\n", + "\n", + "IPython.display.clear_output()\n", + "multi_val_performance['Linear'] = multi_linear_model.evaluate(multi_window.val)\n", + "multi_performance['Linear'] = multi_linear_model.evaluate(multi_window.test, verbose=0)\n", + "multi_window.plot(multi_linear_model)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "informative-cambridge", + "metadata": {}, + "outputs": [], + "source": [ + "multi_lstm_model = tf.keras.Sequential([\n", + " # Shape [batch, time, features] => [batch, lstm_units]\n", + " # Adding more `lstm_units` just overfits more quickly.\n", + " tf.keras.layers.LSTM(8, return_sequences=False),\n", + " # Shape => [batch, out_steps*features]\n", + " tf.keras.layers.Dense(OUT_STEPS*num_features),\n", + " #kernel_initializer=tf.initializers.zeros),\n", + " # Shape => [batch, out_steps, features]\n", + " tf.keras.layers.Reshape([OUT_STEPS, num_features])\n", + "])\n", + "\n", + "history = compile_and_fit(multi_lstm_model, multi_window)\n", + "\n", + "IPython.display.clear_output()\n", + "\n", + "multi_val_performance['LSTM'] = multi_lstm_model.evaluate(multi_window.val)\n", + "multi_performance['LSTM'] = multi_lstm_model.evaluate(multi_window.test, verbose=0)\n", + "multi_window.plot(multi_lstm_model)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "annoying-delhi", + "metadata": {}, + "outputs": [], + "source": [ + "multi_lstm_model.save('multi_lstm')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "published-cincinnati", + "metadata": {}, + "outputs": [], + "source": [ + "class FeedBack(tf.keras.Model):\n", + " def __init__(self, units, out_steps):\n", + " super().__init__()\n", + " self.out_steps = out_steps\n", + " self.units = units\n", + " self.lstm_cell = tf.keras.layers.LSTMCell(units)\n", + " # Also wrap the LSTMCell in an RNN to simplify the `warmup` method.\n", + " self.lstm_rnn = tf.keras.layers.RNN(self.lstm_cell, return_state=True)\n", + " self.dense = tf.keras.layers.Dense(num_features)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "significant-gravity", + "metadata": {}, + "outputs": [], + "source": [ + "feedback_model = FeedBack(units=32, out_steps=OUT_STEPS)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "qualified-champion", + "metadata": {}, + "outputs": [], + "source": [ + "def warmup(self, inputs):\n", + " # inputs.shape => (batch, time, features)\n", + " # x.shape => (batch, lstm_units)\n", + " x, *state = self.lstm_rnn(inputs)\n", + "\n", + " # predictions.shape => (batch, features)\n", + " prediction = self.dense(x)\n", + " return prediction, state\n", + "\n", + "FeedBack.warmup = warmup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "broad-patent", + "metadata": {}, + "outputs": [], + "source": [ + "prediction, state = feedback_model.warmup(multi_window.example[0])\n", + "prediction.shape" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "simplified-jordan", + "metadata": {}, + "outputs": [], + "source": [ + "def call(self, inputs, training=None):\n", + " # Use a TensorArray to capture dynamically unrolled outputs.\n", + " predictions = []\n", + " # Initialize the lstm state\n", + " prediction, state = self.warmup(inputs)\n", + "\n", + " # Insert the first prediction\n", + " predictions.append(prediction)\n", + "\n", + " # Run the rest of the prediction steps\n", + " for n in range(1, self.out_steps):\n", + " # Use the last prediction as input.\n", + " x = prediction\n", + " # Execute one lstm step.\n", + " x, state = self.lstm_cell(x, states=state,\n", + " training=training)\n", + " # Convert the lstm output to a prediction.\n", + " prediction = self.dense(x)\n", + " # Add the prediction to the output\n", + " predictions.append(prediction)\n", + "\n", + " # predictions.shape => (time, batch, features)\n", + " predictions = tf.stack(predictions)\n", + " # predictions.shape => (batch, time, features)\n", + " predictions = tf.transpose(predictions, [1, 0, 2])\n", + " return predictions\n", + "\n", + "FeedBack.call = call" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "identical-scoop", + "metadata": {}, + "outputs": [], + "source": [ + "history = compile_and_fit(feedback_model, multi_window)\n", + "\n", + "IPython.display.clear_output()\n", + "\n", + "multi_val_performance['AR LSTM'] = feedback_model.evaluate(multi_window.val)\n", + "multi_performance['AR LSTM'] = feedback_model.evaluate(multi_window.test, verbose=0)\n", + "multi_window.plot(feedback_model)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "immediate-kentucky", + "metadata": {}, + "outputs": [], + "source": [ + "x = np.arange(len(multi_performance))\n", + "width = 0.3\n", + "\n", + "\n", + "metric_name = 'mean_absolute_error'\n", + "metric_index = multi_lstm_model.metrics_names.index('mean_absolute_error')\n", + "val_mae = [v[metric_index] for v in multi_val_performance.values()]\n", + "test_mae = [v[metric_index] for v in multi_performance.values()]\n", + "\n", + "plt.bar(x - 0.17, val_mae, width, label='Validation')\n", + "plt.bar(x + 0.17, test_mae, width, label='Test')\n", + "plt.xticks(ticks=x, labels=multi_performance.keys(),\n", + " rotation=45)\n", + "plt.ylabel(f'MAE (average over all times and outputs)')\n", + "_ = plt.legend()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "parental-holiday", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "pyenv-tms-demo", + "language": "python", + "name": "tms-demo" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/solutions/iot/weather-stations/machinelearning/predict-kafka-dataset.ipynb b/solutions/iot/weather-stations/machinelearning/predict-kafka-dataset.ipynb new file mode 100644 index 0000000..0aba4ff --- /dev/null +++ b/solutions/iot/weather-stations/machinelearning/predict-kafka-dataset.ipynb @@ -0,0 +1,351 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "computational-hamburg", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import datetime\n", + "\n", + "import pandas as pd\n", + "import seaborn as sns\n", + "import tensorflow as tf\n", + "\n", + "import matplotlib as mpl\n", + "import matplotlib.pyplot as plt\n", + "\n", + "import numpy as np\n", + "\n", + "from tqdm import tqdm\n", + "\n", + "from confluent_kafka import avro, Consumer, KafkaError, KafkaException\n", + "from confluent_kafka.avro import CachedSchemaRegistryClient\n", + "from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerde\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "frozen-belfast", + "metadata": {}, + "outputs": [], + "source": [ + "# load the model first before we start filling GPU mem with other stuff\n", + "# NOTE! If running on notebook env make sure you don't have other kernels consuming GPU mem\n", + "saved_model = tf.keras.models.load_model('multi_lstm')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "italic-offense", + "metadata": {}, + "outputs": [], + "source": [ + "def wind_vector(self, velocity, max_velocity, direction):\n", + " # Convert to radians.\n", + " wd_rad = direction*np.pi / 180\n", + " self['Wx'] = velocity*np.cos(wd_rad)\n", + " self['Wy'] = velocity*np.sin(wd_rad)\n", + " self['max Wx'] = max_velocity*np.cos(wd_rad)\n", + " self['max Wy'] = max_velocity*np.sin(wd_rad)\n", + "pd.DataFrame.wind_vector = wind_vector" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "casual-ticket", + "metadata": {}, + "outputs": [], + "source": [ + "def tod_signal(self, date_time):\n", + " day = 24*60*60\n", + " year = (365.2425)*day\n", + " timestamp_s = date_time.map(datetime.datetime.timestamp)\n", + " self['Day sin'] = np.sin(timestamp_s * (2 * np.pi / day))\n", + " self['Day cos'] = np.cos(timestamp_s * (2 * np.pi / day))\n", + " self['Year sin'] = np.sin(timestamp_s * (2 * np.pi / year))\n", + " self['Year cos'] = np.cos(timestamp_s * (2 * np.pi / year))\n", + "pd.DataFrame.tod_signal = tod_signal" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "offshore-utility", + "metadata": {}, + "outputs": [], + "source": [ + "schema_registry_config = {}\n", + "with open('../tms-secrets/schema_registry_uri') as f:\n", + " schema_registry_config['url'] = f.read().rstrip('\\n')\n", + " \n", + "schema_registry = CachedSchemaRegistryClient(schema_registry_config)\n", + "avro_serde = AvroSerde(schema_registry)\n", + "deserialize_avro = avro_serde.decode_message" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "temporal-prerequisite", + "metadata": {}, + "outputs": [], + "source": [ + "def create_client(): \n", + " \n", + " consumer_config = { \"group.id\": \"jhub-mac-5\",\n", + " \"max.poll.interval.ms\": 20000,\n", + " \"session.timeout.ms\": 10000,\n", + " \"default.topic.config\": {\"auto.offset.reset\": \"earliest\"},\n", + " \"security.protocol\": \"SSL\",\n", + " \"ssl.ca.location\": \"../tms-secrets/processing/ca.pem\",\n", + " \"ssl.certificate.location\": \"../tms-secrets/processing/service.cert\",\n", + " \"ssl.key.location\": \"../tms-secrets/processing/service.key\"\n", + " }\n", + " with open('../tms-secrets/kafka_service_uri') as f:\n", + " consumer_config['bootstrap.servers'] = f.read().rstrip('\\n')\n", + " \n", + " return Consumer(consumer_config) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "through-express", + "metadata": {}, + "outputs": [], + "source": [ + "dataset_dict = []\n", + "def consume_records():\n", + " client = create_client()\n", + " client.subscribe([\"observations.weather.multivariate\"])\n", + " i = 0\n", + " for i in tqdm(range(400000)):\n", + " msg = client.poll(15)\n", + " if msg is None: \n", + " continue\n", + " \n", + " if msg.error():\n", + " if msg.error().code() == KafkaError._PARTITION_EOF:\n", + " # End of partition event\n", + " sys.stderr.write('%% %s [%d] reached end at offset %d\\n' %\n", + " (msg.topic(), msg.partition(), msg.offset()))\n", + " elif msg.error():\n", + " raise KafkaException(msg.error())\n", + " else:\n", + " value = deserialize_avro(message=msg.value(), is_key=False)\n", + " dataset_dict.append(value)\n", + " pass\n", + " client.close()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "leading-somerset", + "metadata": {}, + "outputs": [], + "source": [ + "# input dateset from Kafka\n", + "#consume_records()\n", + "kafka_df = pd.json_normalize(dataset_dict)\n", + "kafka_df['measuredTime'] = pd.to_datetime(kafka_df['measuredTime'] * 1000 * 1000)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b714bac4-4d00-40eb-97df-a0c4fc1c5c14", + "metadata": {}, + "outputs": [], + "source": [ + "kafka_df.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "egyptian-dressing", + "metadata": {}, + "outputs": [], + "source": [ + "# select features\n", + "columns = ['roadStationId', 'measuredTime', 'measurements.ILMANPAINE', 'measurements.ILMAN_KOSTEUS', 'measurements.ILMA', 'measurements.TUULENSUUNTA', 'measurements.MAKSIMITUULI', 'measurements.KESKITUULI']\n", + "kafka_df = kafka_df[columns]\n", + "kafka_df.index = kafka_df['measuredTime']\n", + "del kafka_df['measuredTime']\n", + "kafka_df.info()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "direct-burke", + "metadata": {}, + "outputs": [], + "source": [ + "# fill gaps\n", + "kafka_interpo = kafka_df.groupby('roadStationId').resample('600s').mean().interpolate()\n", + "del kafka_interpo['roadStationId']\n", + "len(kafka_interpo.index.unique(level='roadStationId'))\n", + "# drop weather stations that can't provide all needed features\n", + "kafka_interpo = kafka_interpo.dropna()\n", + "kafka_interpo = kafka_interpo.iloc[kafka_interpo.index.get_level_values(0) == 2052]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "introductory-somalia", + "metadata": {}, + "outputs": [], + "source": [ + "# create wind vectors from velocity and direction\n", + "kafka_interpo.wind_vector(kafka_interpo.pop('measurements.TUULENSUUNTA'), kafka_interpo.pop('measurements.MAKSIMITUULI'), kafka_interpo.pop('measurements.KESKITUULI'))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "british-mainland", + "metadata": {}, + "outputs": [], + "source": [ + "plt.hist2d(kafka_interpo['max Wx'], kafka_interpo['max Wy'], bins=(50, 50), vmax=10)\n", + "plt.colorbar()\n", + "plt.xlabel('Wind X [m/s]')\n", + "plt.ylabel('Wind Y [m/s]')\n", + "ax = plt.gca()\n", + "ax.axis('tight')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "maritime-willow", + "metadata": {}, + "outputs": [], + "source": [ + "# calculate time of day signal from time index\n", + "kafka_interpo.tod_signal(kafka_interpo.index.get_level_values('measuredTime'))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "educational-moses", + "metadata": {}, + "outputs": [], + "source": [ + "column_names = pd.read_csv('predict/trainset_columns.csv', index_col=0)\n", + "kafka_interpo.columns = column_names['0'].values" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bacterial-ukraine", + "metadata": {}, + "outputs": [], + "source": [ + "# Normalize\n", + "#train_mean = pd.read_pickle('predict/train_mean.pkl')\n", + "#train_std = pd.read_pickle('predict/train_std.pkl')\n", + "#kafka_norm = (kafka_interpo - train_mean) / train_std\n", + "#kafka_norm.shape\n", + "\n", + "# Normalize\n", + "input_mean = kafka_interpo.mean()\n", + "input_std = kafka_interpo.std()\n", + "kafka_norm = (kafka_interpo - input_mean) / input_std\n", + "kafka_norm.shape" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "smart-state", + "metadata": {}, + "outputs": [], + "source": [ + "kafka_norm.head(10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "controlling-supervisor", + "metadata": {}, + "outputs": [], + "source": [ + "HOURS = 24\n", + "#inpslic = slice(-6 * HOURS,None)\n", + "inputdf = kafka_norm[:6 * HOURS]\n", + "data = np.array(inputdf, dtype=np.float32)\n", + "input = tf.keras.preprocessing.timeseries_dataset_from_array(\n", + " data=data,\n", + " targets=None,\n", + " sequence_length=len(inputdf),\n", + " sequence_stride=1,\n", + " shuffle=True,\n", + " batch_size=32,)\n", + "y = saved_model.predict(input);\n", + "#y = repeat_baseline.predict(input);\n", + "\n", + "result = pd.DataFrame(y[0,:], columns=kafka_norm.columns)\n", + "result = result[:6 * HOURS]\n", + "result = input_std * result + input_mean\n", + "#result = train_std * result + train_mean\n", + "\n", + "result['date'] = inputdf.index.get_level_values(1) + pd.Timedelta(value = HOURS, unit = 'h')\n", + "result = result.set_index('date').add_prefix('pred_')\n", + "all = pd.concat([result, kafka_interpo.droplevel('roadStationId')], axis=1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fleet-praise", + "metadata": {}, + "outputs": [], + "source": [ + "mask = all.columns.str.contains('.*lämpötila.*|.*Suhteellinen.*')\n", + "all.sort_index().loc[:,mask].plot()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3bc2caac-7890-4637-8fdf-b432a1f02be5", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "pyenv-tms-demo", + "language": "python", + "name": "tms-demo" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/solutions/iot/weather-stations/machinelearning/vantaa-2017-2020.csv.gz b/solutions/iot/weather-stations/machinelearning/vantaa-2017-2020.csv.gz new file mode 100644 index 0000000..9fef7e0 Binary files /dev/null and b/solutions/iot/weather-stations/machinelearning/vantaa-2017-2020.csv.gz differ diff --git a/solutions/iot/weather-stations/observability/Dockerfile.highlander b/solutions/iot/weather-stations/observability/Dockerfile.highlander new file mode 100644 index 0000000..ad00de2 --- /dev/null +++ b/solutions/iot/weather-stations/observability/Dockerfile.highlander @@ -0,0 +1,19 @@ +FROM golang:1.15 as builder +ENV APP_USER highlander +ENV APP_HOME /go/src/highlander +RUN groupadd $APP_USER && useradd -m -g $APP_USER -l $APP_USER +RUN mkdir -p $APP_HOME && chown -R $APP_USER:$APP_USER $APP_HOME +WORKDIR $APP_HOME +USER $APP_USER +COPY --chown=$APP_USER:$APP_USER highlander/ highlander +RUN cd highlander && make +FROM golang:1.15 +ENV APP_USER highlander +ENV APP_HOME /go/src/highlander +RUN groupadd $APP_USER && useradd -m -g $APP_USER -l $APP_USER +RUN mkdir -p $APP_HOME +WORKDIR $APP_HOME +COPY --chown=0:0 --from=builder $APP_HOME/highlander/build/highlander $APP_HOME +EXPOSE 9092 +USER $APP_USER +ENTRYPOINT exec ./highlander -r $M3_URL -u $M3_USER -p $M3_PASSWORD -l 0.0.0.0:9092 diff --git a/solutions/iot/weather-stations/observability/README.md b/solutions/iot/weather-stations/observability/README.md new file mode 100644 index 0000000..e1be7aa --- /dev/null +++ b/solutions/iot/weather-stations/observability/README.md @@ -0,0 +1,49 @@ +# Observability stack for managed Aiven services, Kubernetes and Kafka Streams with Prometheus, M3 and OpenSearch + +This directory contains K8s manifests for deploying observability capabilities. There is also a git submodule for Highlander reverse proxy. Highlander is used for de-duplicating metrics from HA Prometheus setup. + +# Prometheus + +We use Jsonnet and Jsonnet-bundler for creating Prometheus Kubernetes manifests. + +Install gojsontoyaml +```` +go install gojsontoyaml@latest +```` + +Now we can build the k8s manifests for Prometheus + +```` +cd prometheus +jb init +jb install github.com/prometheus-operator/kube-prometheus/jsonnet/kube-prometheus@release-0.9 +./build.sh example-0.9.jsonnet +printf " remoteWrite:\n - url: \"http://highlander:9092/api/v1/prom/remote/write\"\n" >> manifests/prometheus-prometheus.yaml +```` + +Next we will create the Prometheus Operator CRDs and Aiven for M3 secrets + +```` +kubectl create -f manifests/setup +./create-m3-secret.sh +```` + +Highlander is a reverse proxy on Prometheus write path. It only allow single client to write to target (M3) so effectively deduplicates datapoints written by replicated Prometheus deployment (HA) + +Now that we have M3 secrets in place it's time to deploy Highlander Proxy + +```` +kubectl create -f ../k8s/highlander.yaml +```` + +All is now in place for deploying the actual Prometheus instances + +```` +kubectl create -f manifests + +```` + +With Prometheus running we can deploy ServiceMonitor for all our Kafka clients +```` +kubectl create -f ../k8s/prometheus-servicemonitor.yaml +```` diff --git a/solutions/iot/weather-stations/observability/k8s/highlander.yaml b/solutions/iot/weather-stations/observability/k8s/highlander.yaml new file mode 100644 index 0000000..b6ecd06 --- /dev/null +++ b/solutions/iot/weather-stations/observability/k8s/highlander.yaml @@ -0,0 +1,47 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: highlander + namespace: monitoring + labels: + app: highlander +spec: + replicas: 1 + selector: + matchLabels: + app: highlander + template: + metadata: + labels: + app: highlander + spec: + containers: + - name: highlander + image: ssaily/highlander:0.0.4 + envFrom: + - secretRef: + name: m3-prom + resources: + limits: + memory: 512Mi + cpu: "1" + requests: + memory: 256Mi + cpu: "100m" + imagePullPolicy: Always +--- +apiVersion: v1 +kind: Service +metadata: + name: highlander + namespace: monitoring + labels: + app: highlander +spec: + ports: + - name: highlander + port: 9092 + targetPort: 9092 + protocol: TCP + selector: + app: highlander diff --git a/solutions/iot/weather-stations/observability/k8s/prometheus-servicemonitor.yaml b/solutions/iot/weather-stations/observability/k8s/prometheus-servicemonitor.yaml new file mode 100644 index 0000000..28b2574 --- /dev/null +++ b/solutions/iot/weather-stations/observability/k8s/prometheus-servicemonitor.yaml @@ -0,0 +1,64 @@ +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: servicemonitor-enrichment + namespace: tms-demo +spec: + selector: + matchLabels: + app: tms-demo-enrichment + endpoints: + - interval: 10s + port: metrics +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: servicemonitor-calculations + namespace: tms-demo +spec: + selector: + matchLabels: + app: tms-demo-calculations + endpoints: + - interval: 10s + port: metrics +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: servicemonitor-multivariate + namespace: tms-demo +spec: + selector: + matchLabels: + app: tms-demo-multivariate + endpoints: + - interval: 10s + port: metrics +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: servicemonitor-m3sink + namespace: tms-demo +spec: + selector: + matchLabels: + app: tms-demo-sink + endpoints: + - interval: 10s + port: metrics +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: servicemonitor-ingest + namespace: tms-demo +spec: + selector: + matchLabels: + app: tms-demo-ingest + endpoints: + - interval: 10s + port: metrics diff --git a/solutions/iot/weather-stations/observability/opensearch/fluentbit-configmap.yaml b/solutions/iot/weather-stations/observability/opensearch/fluentbit-configmap.yaml new file mode 100644 index 0000000..b3672f6 --- /dev/null +++ b/solutions/iot/weather-stations/observability/opensearch/fluentbit-configmap.yaml @@ -0,0 +1,135 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: fluent-bit-config + namespace: logging + labels: + k8s-app: fluent-bit +data: + # Configuration files: server, input, filters, and output + # ======================================================= + fluent-bit.conf: | + [SERVICE] + Flush 1 + Log_Level info + Daemon off + Parsers_File parsers.conf + HTTP_Server On + HTTP_Listen 0.0.0.0 + HTTP_Port 2020 + # Plugins_File /fluent-bit/etc/plugins.conf + + @INCLUDE input-kubernetes.conf + @INCLUDE filter-kubernetes.conf + @INCLUDE output-opensearch.conf + + input-kubernetes.conf: | + [INPUT] + Name tail + Path /var/log/containers/*.log + Tag kube.* + Parser docker + DB /var/log/flb_kube.db + Mem_Buf_Limit 20MB + Skip_Long_Lines On + Refresh_Interval 10 + # Control the log line length + Buffer_Chunk_Size 256k + Buffer_Max_Size 10240k + # Using the docker mode to deal with multiline messages emitted by docker + Docker_Mode On + + replace_info.lua: | + function replace_sensitive_info(tag, timestamp, record) + -- mask social security number + record["log"] = string.gsub(record["log"], "%d%d%d%-*%d%d%-*%d%d%d%d", "xxx-xx-xxxx") + -- mask credit card number + record["log"] = string.gsub(record["log"], "%d%d%d%d *%d%d%d%d *%d%d%d%d *%d%d%d%d", "xxxx xxxx xxxx xxxx") + -- mask email address + record["log"] = string.gsub(record["log"], "[%w+%.%-_]+@[%w+%.%-_]+%.%a%a+", "user@email.tld") + return 1, timestamp, record + end + + filter-kubernetes.conf: | + [FILTER] + Name kubernetes + Match kube.* + Kube_URL https://kubernetes.default.svc:443 + Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token + Kube_Tag_Prefix kube.var.log.containers. + # Try to merge the log messages + Merge_Log On + Merge_Log_Key log_processed + K8S-Logging.Parser On + K8S-Logging.Exclude Off + # ### sample log scrubbing filters + #[FILTER] + # Name lua + # Match kube.* + # # lua script to redact sensitive data in log messages + # script replace_info.lua + # call replace_sensitive_info + # ### end sample log scrubbing + + output-opensearch.conf: | + + [OUTPUT] + # write the log records that still have the 'kube.*' tags to OpenSearch + Name es + Match kube.* + Host ${OPENSEARCH_HOST} + Port ${OPENSEARCH_PORT} + Index kubernetes_cluster + tls On + Type doc + HTTP_User ${OPENSEARCH_USER} + HTTP_Passwd ${OPENSEARCH_PASSWORD} + + parsers.conf: | + [PARSER] + Name apache + Format regex + Regex ^(?[^ ]*) [^ ]* (?[^ ]*) \[(?