diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 98d5b5af..a31e0aa8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -27,7 +27,7 @@ repos: - id: terraform_validate # Exclude modules to work around # https://github.com/hashicorp/terraform/issues/28490 - exclude: "terraform/[^/]+/modules/[^/]+/[^/]+$" + exclude: "terraform/modules/[^/]+/[^/]+$" - id: terraform_tflint - repo: https://github.com/charliermarsh/ruff-pre-commit rev: v0.1.6 diff --git a/poetry.lock b/poetry.lock index 78aa4f3b..dc2e3b07 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1161,14 +1161,14 @@ files = [ [[package]] name = "dbt-semantic-interfaces" -version = "0.2.2" +version = "0.2.3" description = "The shared semantic layer definitions that dbt-core and MetricFlow use" category = "main" optional = false python-versions = ">=3.8" files = [ - {file = "dbt_semantic_interfaces-0.2.2-py3-none-any.whl", hash = "sha256:64eeed377f2c8fe25092ff5598ac79a3b46a3c521d2ec40e26adcedaf0fcb999"}, - {file = "dbt_semantic_interfaces-0.2.2.tar.gz", hash = "sha256:7daf7d4eaaa2600985ecf3946c5fe43027bf64aee41a7ce4045a8f036a52596b"}, + {file = "dbt_semantic_interfaces-0.2.3-py3-none-any.whl", hash = "sha256:69235a6b261c45d1501c29a9b2a9496935b0c764e7fd25a2f83471ef53dae15f"}, + {file = "dbt_semantic_interfaces-0.2.3.tar.gz", hash = "sha256:628fd65ce01bbfbf9115866d2bb11616d0e1987b65fb4fabfdf9bb807177c2ee"}, ] [package.dependencies] @@ -1180,7 +1180,7 @@ more-itertools = ">=8.0,<9.0" pydantic = ">=1.10,<2.0" python-dateutil = ">=2.0,<3.0" pyyaml = ">=6.0,<7.0" -typing-extensions = ">=4.0,<5.0" +typing-extensions = ">=4.4,<5.0" [[package]] name = "dbt-snowflake" diff --git a/scripts/setup_clearinghouse.sql b/scripts/setup_clearinghouse.sql new file mode 100644 index 00000000..9494d786 --- /dev/null +++ b/scripts/setup_clearinghouse.sql @@ -0,0 +1,77 @@ +CREATE SCHEMA IF NOT EXISTS CLEARINGHOUSE; + +CREATE OR REPLACE FILE FORMAT CLEARINGHOUSE.STATION_RAW +TYPE = csv +PARSE_HEADER = false +FIELD_DELIMITER = ',' +SKIP_HEADER = 0 +COMPRESSION='gzip'; + +CREATE OR REPLACE FILE FORMAT CLEARINGHOUSE.STATION_META +TYPE = csv +PARSE_HEADER = false +FIELD_DELIMITER = '\t' +SKIP_HEADER = 1; + +CREATE OR REPLACE FILE FORMAT CLEARINGHOUSE.STATION_STATUS +TYPE = XML +STRIP_OUTER_ELEMENT = TRUE; + +CREATE TABLE IF NOT EXISTS CLEARINGHOUSE.STATION_RAW ( + FILENAME TEXT, + SAMPLE_TIMESTAMP TIMESTAMP_NTZ, + SAMPLE_DATE DATE, + ID TEXT, + FLOW_1 INT, + OCCUPANCY_1 FLOAT, + SPEED_1 FLOAT, + FLOW_2 INT, + OCCUPANCY_2 FLOAT, + SPEED_2 FLOAT, + FLOW_3 INT, + OCCUPANCY_3 FLOAT, + SPEED_3 FLOAT, + FLOW_4 INT, + OCCUPANCY_4 FLOAT, + SPEED_4 FLOAT, + FLOW_5 INT, + OCCUPANCY_5 FLOAT, + SPEED_5 FLOAT, + FLOW_6 INT, + OCCUPANCY_6 FLOAT, + SPEED_6 FLOAT, + FLOW_7 INT, + OCCUPANCY_7 FLOAT, + SPEED_7 FLOAT, + FLOW_8 INT, + OCCUPANCY_8 FLOAT, + SPEED_8 FLOAT +) +CLUSTER BY (SAMPLE_DATE); + +CREATE TABLE IF NOT EXISTS CLEARINGHOUSE.STATION_META ( + FILENAME TEXT, + ID TEXT, + FWY TEXT, + DIR TEXT, + DISTRICT TEXT, + COUNTY TEXT, + CITY TEXT, + STATE_PM TEXT, + ABS_PM TEXT, + LATITUDE FLOAT, + LONGITUDE FLOAT, + LENGTH FLOAT, + TYPE TEXT, + LANES INT, + NAME TEXT, + USER_ID_1 TEXT, + USER_ID_2 TEXT, + USER_ID_3 TEXT, + USER_ID_4 TEXT +); + +CREATE TABLE IF NOT EXISTS CLEARINGHOUSE.STATION_STATUS ( + FILENAME TEXT, + CONTENT VARIANT +); diff --git a/terraform/environments/dev/.terraform.lock.hcl b/terraform/environments/dev/.terraform.lock.hcl index 0d26c020..92e599b7 100644 --- a/terraform/environments/dev/.terraform.lock.hcl +++ b/terraform/environments/dev/.terraform.lock.hcl @@ -27,20 +27,23 @@ provider "registry.terraform.io/hashicorp/aws" { } provider "registry.terraform.io/snowflake-labs/snowflake" { - version = "0.69.0" - constraints = "~> 0.61, 0.69.0" + version = "0.71.0" + constraints = "~> 0.61, 0.71.0" hashes = [ - "h1:0CSDES330nTqimmmNwQxxp24dDkRGYTrz46wjJDBG9M=", - "h1:S57eACfdn03/8yUFRcSA52BsBsuEIJ0KwP5jhdoKMuY=", - "h1:rB052GXnpNuUR9ZeGgiY8imkQqPWkI32PtiqWTORJXQ=", - "zh:05c38443ecce2a74568a182eb8a796db5d333846552d75c16b158f5034e824a3", - "zh:38b549a2e09b911709c85340222533cff9fde9c0de3e83cb906891822b340279", - "zh:3d4468d3be703c0545db94f6f1861ffd0f6e6e40cbdad6cf22506fcf368d33cd", - "zh:43ae1d8dd68545923ad460593b3f9c6d0a010b66c8ece7c62e4c2c716e3bb848", - "zh:4f4bb057e411138f876cc87b676269f23d62ffeafd4bbcf7d96521ee8abb2dff", - "zh:51c3f30ec1ede2c002d103e0dfeee187ef69be77c7a948355f6e5a5c505745f2", - "zh:74e7fd7d960a2e7e069941fb665e2873c4ced8cf84d5e1f8aded98ad9bb742c8", - "zh:97330c429ba7c17eec8ee325695bdc81093e1c70a275fd70ffeb00719ffc73ce", + "h1:OzG5BB2GnRFN5Z9UlprM1QKabM0BchFxCNbAGFZK7ug=", + "h1:gGn8oiljlcX7AvdPpoFzQGVZ5NPJ51NycPJUQxF3US4=", + "h1:lr1cAHKmrg6pe4J/5EbjTpkP9C2BEfneEILC7007xn4=", + "zh:16d905e911a699693cea3b8d95b7dfbecbb74779cb78fbf98532fd910350cc3b", + "zh:2d099d606a315f699100b9259078a87905f1760c69a4002c210c0d40401110a4", + "zh:317115adad369ff94bacc2f64be8276c6c4bb99d254855f038d8881fb49036f8", + "zh:594ac3bbdfb0e97d0d59cdb83ced06a6671dad3ee09bce11494ef6189a39210b", + "zh:5df6cb55cea1319889350cd9cda3fa5c6c9dca0e7aae83e3056a360f2bef1282", + "zh:77a9f007d0f69feeb9c707000f347cd465b1bedfdc3ee25e355c2474e23a848d", + "zh:9966c72c5be444386967b0194fbe67a256f101abb2333ce13f9a9ae73b7854aa", + "zh:aa745c0f0562aa3153d3584d6857d23d72ef15612be7cb49c54e682fe27b6599", + "zh:c54c3ce2e84c499e4063fe82ed952ccc5617bd46774f44f408926726a776b2b3", + "zh:cb2a3d9c60f690140fbf2f53369965f51b5d6736e194835db7018c2700a0fc47", "zh:f569b65999264a9416862bca5cd2a6177d94ccb0424f3a4ef424428912b9cb3c", + "zh:f6f50842a41a0675f8cef964892a8804e53a07a680869ab0142d909a5c786f73", ] } diff --git a/terraform/environments/dev/main.tf b/terraform/environments/dev/main.tf index e280daf6..7528bcc1 100644 --- a/terraform/environments/dev/main.tf +++ b/terraform/environments/dev/main.tf @@ -12,7 +12,7 @@ terraform { } snowflake = { source = "Snowflake-Labs/snowflake" - version = "0.69" + version = "0.71" } } @@ -26,6 +26,13 @@ locals { project = "pems" region = "us-west-2" locator = "NGB13288" + + # These are circular dependencies on the outputs. Unfortunate, but + # necessary, as we don't know them until we've created the storage + # integration, which itself depends on the assume role policy. + storage_aws_external_id = "NGB13288_SFCRole=2_P94CCaZYR9XFUzpMIGN6HOit/zQ=" + storage_aws_iam_user_arn = "arn:aws:iam::946158320428:user/uunc0000-s" + pipe_sqs_queue_arn = "arn:aws:sqs:us-west-2:946158320428:sf-snowpipe-AIDA5YS3OHMWCVTR5XHEE-YZjsweK3loK4rXlOJBWF_g" } provider "aws" { @@ -49,6 +56,13 @@ provider "snowflake" { role = "PUBLIC" } +# Snowflake provider for account administration (to be used only when necessary). +provider "snowflake" { + alias = "accountadmin" + account = local.locator + role = "ACCOUNTADMIN" +} + # Snowflake provider for creating databases, warehouses, etc. provider "snowflake" { alias = "sysadmin" @@ -80,8 +94,11 @@ module "s3_lake" { aws = aws } - prefix = "${local.owner}-${local.project}-${local.environment}" - region = local.region + prefix = "${local.owner}-${local.project}-${local.environment}" + region = local.region + snowflake_raw_storage_integration_iam_user_arn = local.storage_aws_iam_user_arn + snowflake_raw_storage_integration_external_id = local.storage_aws_external_id + snowflake_pipe_sqs_queue_arn = local.pipe_sqs_queue_arn } data "aws_iam_role" "mwaa_execution_role" { @@ -97,6 +114,7 @@ resource "aws_iam_role_policy_attachment" "mwaa_execution_role" { # Snowflake Infrastructure # ############################ +# Main ELT architecture module "elt" { source = "github.com/cagov/data-infrastructure.git//terraform/snowflake/modules/elt?ref=74a522f" providers = { @@ -107,3 +125,25 @@ module "elt" { environment = upper(local.environment) } + +module "snowflake_clearinghouse" { + source = "../../modules/snowflake-clearinghouse" + providers = { + snowflake.accountadmin = snowflake.accountadmin, + snowflake.securityadmin = snowflake.securityadmin, + snowflake.sysadmin = snowflake.sysadmin, + snowflake.useradmin = snowflake.useradmin, + } + + environment = upper(local.environment) + s3_url = "s3://${module.s3_lake.pems_raw_bucket.name}" + storage_aws_role_arn = module.s3_lake.snowflake_storage_integration_role.arn +} + +output "pems_raw_stage" { + value = module.snowflake_clearinghouse.pems_raw_stage +} + +output "notification_channel" { + value = module.snowflake_clearinghouse.notification_channel +} diff --git a/terraform/environments/prd/.terraform.lock.hcl b/terraform/environments/prd/.terraform.lock.hcl index 0d26c020..95839688 100644 --- a/terraform/environments/prd/.terraform.lock.hcl +++ b/terraform/environments/prd/.terraform.lock.hcl @@ -27,20 +27,23 @@ provider "registry.terraform.io/hashicorp/aws" { } provider "registry.terraform.io/snowflake-labs/snowflake" { - version = "0.69.0" - constraints = "~> 0.61, 0.69.0" + version = "0.71.0" + constraints = "~> 0.61, 0.71.0, ~> 0.71" hashes = [ - "h1:0CSDES330nTqimmmNwQxxp24dDkRGYTrz46wjJDBG9M=", - "h1:S57eACfdn03/8yUFRcSA52BsBsuEIJ0KwP5jhdoKMuY=", - "h1:rB052GXnpNuUR9ZeGgiY8imkQqPWkI32PtiqWTORJXQ=", - "zh:05c38443ecce2a74568a182eb8a796db5d333846552d75c16b158f5034e824a3", - "zh:38b549a2e09b911709c85340222533cff9fde9c0de3e83cb906891822b340279", - "zh:3d4468d3be703c0545db94f6f1861ffd0f6e6e40cbdad6cf22506fcf368d33cd", - "zh:43ae1d8dd68545923ad460593b3f9c6d0a010b66c8ece7c62e4c2c716e3bb848", - "zh:4f4bb057e411138f876cc87b676269f23d62ffeafd4bbcf7d96521ee8abb2dff", - "zh:51c3f30ec1ede2c002d103e0dfeee187ef69be77c7a948355f6e5a5c505745f2", - "zh:74e7fd7d960a2e7e069941fb665e2873c4ced8cf84d5e1f8aded98ad9bb742c8", - "zh:97330c429ba7c17eec8ee325695bdc81093e1c70a275fd70ffeb00719ffc73ce", + "h1:OzG5BB2GnRFN5Z9UlprM1QKabM0BchFxCNbAGFZK7ug=", + "h1:gGn8oiljlcX7AvdPpoFzQGVZ5NPJ51NycPJUQxF3US4=", + "h1:lr1cAHKmrg6pe4J/5EbjTpkP9C2BEfneEILC7007xn4=", + "zh:16d905e911a699693cea3b8d95b7dfbecbb74779cb78fbf98532fd910350cc3b", + "zh:2d099d606a315f699100b9259078a87905f1760c69a4002c210c0d40401110a4", + "zh:317115adad369ff94bacc2f64be8276c6c4bb99d254855f038d8881fb49036f8", + "zh:594ac3bbdfb0e97d0d59cdb83ced06a6671dad3ee09bce11494ef6189a39210b", + "zh:5df6cb55cea1319889350cd9cda3fa5c6c9dca0e7aae83e3056a360f2bef1282", + "zh:77a9f007d0f69feeb9c707000f347cd465b1bedfdc3ee25e355c2474e23a848d", + "zh:9966c72c5be444386967b0194fbe67a256f101abb2333ce13f9a9ae73b7854aa", + "zh:aa745c0f0562aa3153d3584d6857d23d72ef15612be7cb49c54e682fe27b6599", + "zh:c54c3ce2e84c499e4063fe82ed952ccc5617bd46774f44f408926726a776b2b3", + "zh:cb2a3d9c60f690140fbf2f53369965f51b5d6736e194835db7018c2700a0fc47", "zh:f569b65999264a9416862bca5cd2a6177d94ccb0424f3a4ef424428912b9cb3c", + "zh:f6f50842a41a0675f8cef964892a8804e53a07a680869ab0142d909a5c786f73", ] } diff --git a/terraform/environments/prd/main.tf b/terraform/environments/prd/main.tf index 92c9579e..b4024b10 100644 --- a/terraform/environments/prd/main.tf +++ b/terraform/environments/prd/main.tf @@ -12,7 +12,7 @@ terraform { } snowflake = { source = "Snowflake-Labs/snowflake" - version = "0.69" + version = "0.71" } } @@ -26,6 +26,13 @@ locals { project = "pems" region = "us-west-2" locator = "NGB13288" + + # These are circular dependencies on the outputs. Unfortunate, but + # necessary, as we don't know them until we've created the storage + # integration, which itself depends on the assume role policy. + storage_aws_external_id = "NGB13288_SFCRole=2_GwkZr+HZcrKfUqvsrvBBu6gcqRs=" + storage_aws_iam_user_arn = "arn:aws:iam::946158320428:user/uunc0000-s" + pipe_sqs_queue_arn = "arn:aws:sqs:us-west-2:946158320428:sf-snowpipe-AIDA5YS3OHMWCVTR5XHEE-YZjsweK3loK4rXlOJBWF_g" } provider "aws" { @@ -49,6 +56,13 @@ provider "snowflake" { role = "PUBLIC" } +# Snowflake provider for account administration (to be used only when necessary). +provider "snowflake" { + alias = "accountadmin" + account = local.locator + role = "ACCOUNTADMIN" +} + # Snowflake provider for creating databases, warehouses, etc. provider "snowflake" { alias = "sysadmin" @@ -80,14 +94,27 @@ module "s3_lake" { aws = aws } - prefix = "${local.owner}-${local.project}-${local.environment}" - region = local.region + prefix = "${local.owner}-${local.project}-${local.environment}" + region = local.region + snowflake_raw_storage_integration_iam_user_arn = local.storage_aws_iam_user_arn + snowflake_raw_storage_integration_external_id = local.storage_aws_external_id + snowflake_pipe_sqs_queue_arn = local.pipe_sqs_queue_arn +} + +data "aws_iam_role" "mwaa_execution_role" { + name = "dse-infra-dev-us-west-2-mwaa-execution-role" +} + +resource "aws_iam_role_policy_attachment" "mwaa_execution_role" { + role = data.aws_iam_role.mwaa_execution_role.name + policy_arn = module.s3_lake.pems_raw_read_write_policy.arn } ############################ # Snowflake Infrastructure # ############################ +# Main ELT architecture module "elt" { source = "github.com/cagov/data-infrastructure.git//terraform/snowflake/modules/elt?ref=74a522f" providers = { @@ -98,3 +125,25 @@ module "elt" { environment = upper(local.environment) } + +module "snowflake_clearinghouse" { + source = "../../modules/snowflake-clearinghouse" + providers = { + snowflake.accountadmin = snowflake.accountadmin, + snowflake.securityadmin = snowflake.securityadmin, + snowflake.sysadmin = snowflake.sysadmin, + snowflake.useradmin = snowflake.useradmin, + } + + environment = upper(local.environment) + s3_url = "s3://${module.s3_lake.pems_raw_bucket.name}" + storage_aws_role_arn = module.s3_lake.snowflake_storage_integration_role.arn +} + +output "pems_raw_stage" { + value = module.snowflake_clearinghouse.pems_raw_stage +} + +output "notification_channel" { + value = module.snowflake_clearinghouse.notification_channel +} diff --git a/terraform/modules/.gitignore b/terraform/modules/.gitignore new file mode 100644 index 00000000..3f0336e2 --- /dev/null +++ b/terraform/modules/.gitignore @@ -0,0 +1 @@ +.terraform.lock.hcl diff --git a/terraform/modules/s3-lake/iam.tf b/terraform/modules/s3-lake/iam.tf index 23032d49..034846a0 100644 --- a/terraform/modules/s3-lake/iam.tf +++ b/terraform/modules/s3-lake/iam.tf @@ -14,3 +14,33 @@ resource "aws_iam_user_policy_attachment" "airflow_s3_writer_policy_attachment" user = aws_iam_user.airflow_s3_writer.name policy_arn = aws_iam_policy.pems_raw_read_write.arn } + +# IAM role for Snowflake to assume when reading from the bucket +resource "aws_iam_role" "snowflake_storage_integration" { + name = "${var.prefix}-snowflake-storage-integration" + + # https://docs.snowflake.com/user-guide/data-load-snowpipe-auto-s3#step-5-grant-the-iam-user-permissions-to-access-bucket-objects + assume_role_policy = jsonencode({ + "Version" : "2012-10-17", + "Statement" : [ + { + "Effect" : "Allow", + "Principal" : { + "AWS" : var.snowflake_raw_storage_integration_iam_user_arn + }, + "Action" : "sts:AssumeRole", + "Condition" : { + "StringEquals" : { + "sts:ExternalId" : var.snowflake_raw_storage_integration_external_id + } + } + } + ] + } + ) +} + +resource "aws_iam_role_policy_attachment" "snowflake_storage_integration" { + role = aws_iam_role.snowflake_storage_integration.name + policy_arn = aws_iam_policy.pems_raw_external_stage_policy.arn +} diff --git a/terraform/modules/s3-lake/outputs.tf b/terraform/modules/s3-lake/outputs.tf index 6ba9453a..35390db5 100644 --- a/terraform/modules/s3-lake/outputs.tf +++ b/terraform/modules/s3-lake/outputs.tf @@ -13,3 +13,11 @@ output "pems_raw_read_write_policy" { arn = aws_iam_policy.pems_raw_read_write.arn } } + +output "snowflake_storage_integration_role" { + description = "IAM role for Snowflake to assume when reading from the bucket" + value = { + name = aws_iam_role.snowflake_storage_integration.name + arn = aws_iam_role.snowflake_storage_integration.arn + } +} diff --git a/terraform/modules/s3-lake/s3.tf b/terraform/modules/s3-lake/s3.tf index 7c30960c..a1ca8c7d 100644 --- a/terraform/modules/s3-lake/s3.tf +++ b/terraform/modules/s3-lake/s3.tf @@ -50,3 +50,44 @@ resource "aws_s3_bucket_public_access_block" "pems_raw" { ignore_public_acls = true restrict_public_buckets = true } + +# External stage policy +# From https://docs.snowflake.com/user-guide/data-load-snowpipe-auto-s3#creating-an-iam-policy +data "aws_iam_policy_document" "pems_raw_external_stage_policy" { + statement { + actions = [ + "s3:ListBucket", + "s3:GetBucketLocation", + ] + resources = [aws_s3_bucket.pems_raw.arn] + condition { + test = "StringLike" + variable = "s3:prefix" + values = ["*"] + } + + } + statement { + actions = [ + "s3:GetObject", + "s3:GetObjectVersion", + ] + resources = ["${aws_s3_bucket.pems_raw.arn}/*"] + } +} + +resource "aws_iam_policy" "pems_raw_external_stage_policy" { + name = "${var.prefix}-${var.region}-pems-raw-external-stage-policy" + description = "Policy allowing read/write for snowpipe-test bucket" + policy = data.aws_iam_policy_document.pems_raw_external_stage_policy.json +} + +# Snowpipe notifications +resource "aws_s3_bucket_notification" "snowflake_pipe_notifications" { + count = var.snowflake_pipe_sqs_queue_arn == null ? 0 : 1 + bucket = aws_s3_bucket.pems_raw.id + queue { + queue_arn = var.snowflake_pipe_sqs_queue_arn + events = ["s3:ObjectCreated:*"] + } +} diff --git a/terraform/modules/s3-lake/variables.tf b/terraform/modules/s3-lake/variables.tf index e7284503..a42f7c1f 100644 --- a/terraform/modules/s3-lake/variables.tf +++ b/terraform/modules/s3-lake/variables.tf @@ -8,3 +8,20 @@ variable "region" { type = string default = "us-west-2" } + +variable "snowflake_raw_storage_integration_iam_user_arn" { + description = "ARN for service account created by Snowflake to access external stage" + type = string +} + +variable "snowflake_raw_storage_integration_external_id" { + description = "External ID for Snowflake storage integration" + type = string + default = "0000" +} + +variable "snowflake_pipe_sqs_queue_arn" { + description = "SQS Queue ARN for Snowpipe notification channel" + type = string + default = null +} diff --git a/terraform/modules/snowflake-clearinghouse/main.tf b/terraform/modules/snowflake-clearinghouse/main.tf new file mode 100644 index 00000000..d496cbf5 --- /dev/null +++ b/terraform/modules/snowflake-clearinghouse/main.tf @@ -0,0 +1,123 @@ +###################################### +# Terraform # +###################################### + +terraform { + required_providers { + snowflake = { + source = "Snowflake-Labs/snowflake" + version = "~> 0.71" + configuration_aliases = [ + snowflake.accountadmin, + snowflake.securityadmin, + snowflake.sysadmin, + snowflake.useradmin, + ] + } + } + required_version = ">= 1.0" +} + +# Schema for raw PeMS data +resource "snowflake_schema" "pems_raw" { + provider = snowflake.sysadmin + database = "RAW_${var.environment}" + name = "CLEARINGHOUSE" + data_retention_days = 14 +} + +# External stage +resource "snowflake_storage_integration" "pems_raw" { + provider = snowflake.accountadmin + name = "PEMS_RAW_${var.environment}" + type = "EXTERNAL_STAGE" + storage_provider = "S3" + storage_aws_role_arn = var.storage_aws_role_arn + storage_allowed_locations = [var.s3_url] +} + +resource "snowflake_integration_grant" "pems_raw_to_sysadmin" { + provider = snowflake.accountadmin + integration_name = snowflake_storage_integration.pems_raw.name + privilege = "USAGE" + roles = ["SYSADMIN"] + enable_multiple_grants = true +} + + +resource "snowflake_stage" "pems_raw" { + provider = snowflake.sysadmin + name = "PEMS_RAW_${var.environment}" + url = var.s3_url + database = snowflake_schema.pems_raw.database + schema = snowflake_schema.pems_raw.name + storage_integration = snowflake_storage_integration.pems_raw.name +} + +resource "snowflake_stage_grant" "pems_raw" { + provider = snowflake.sysadmin + database_name = snowflake_stage.pems_raw.database + schema_name = snowflake_stage.pems_raw.schema + roles = ["LOADER_${var.environment}"] + privilege = "USAGE" + stage_name = snowflake_stage.pems_raw.name + enable_multiple_grants = true +} + +# Pipes +resource "snowflake_pipe" "station_raw_pipe" { + provider = snowflake.sysadmin + database = snowflake_schema.pems_raw.database + schema = snowflake_schema.pems_raw.name + name = "STATION_RAW" + auto_ingest = true + + copy_statement = templatefile( + "${path.module}/raw_pipe.sql.tplfile", + { + database = snowflake_schema.pems_raw.database + schema = snowflake_schema.pems_raw.name + table = "STATION_RAW" + stage = snowflake_stage.pems_raw.name + file_format = "STATION_RAW" + }, + ) +} + +resource "snowflake_pipe" "station_meta_pipe" { + provider = snowflake.sysadmin + database = snowflake_schema.pems_raw.database + schema = snowflake_schema.pems_raw.name + name = "STATION_META" + auto_ingest = true + + copy_statement = templatefile( + "${path.module}/meta_pipe.sql.tplfile", + { + database = snowflake_schema.pems_raw.database + schema = snowflake_schema.pems_raw.name + table = "STATION_META" + stage = snowflake_stage.pems_raw.name + file_format = "STATION_META" + }, + ) +} + +resource "snowflake_pipe" "station_status_pipe" { + provider = snowflake.sysadmin + database = snowflake_schema.pems_raw.database + schema = snowflake_schema.pems_raw.name + name = "STATION_STATUS" + auto_ingest = true + + copy_statement = templatefile( + "${path.module}/status_pipe.sql.tplfile", + { + database = snowflake_schema.pems_raw.database + schema = snowflake_schema.pems_raw.name + table = "STATION_STATUS" + stage = snowflake_stage.pems_raw.name + file_format = "STATION_STATUS" + }, + ) +} diff --git a/terraform/modules/snowflake-clearinghouse/meta_pipe.sql.tplfile b/terraform/modules/snowflake-clearinghouse/meta_pipe.sql.tplfile new file mode 100644 index 00000000..d30c2108 --- /dev/null +++ b/terraform/modules/snowflake-clearinghouse/meta_pipe.sql.tplfile @@ -0,0 +1,26 @@ +copy into ${database}.${schema}.${table} +from ( + select + metadata$filename, + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14, + $15, + $16, + $17, + $18 + FROM @${database}.${schema}.${stage}/clhouse/meta/ + ) +file_format = ${database}.${schema}.${file_format} +on_error = continue diff --git a/terraform/modules/snowflake-clearinghouse/outputs.tf b/terraform/modules/snowflake-clearinghouse/outputs.tf new file mode 100644 index 00000000..3480e6dc --- /dev/null +++ b/terraform/modules/snowflake-clearinghouse/outputs.tf @@ -0,0 +1,13 @@ +# Outputs +output "pems_raw_stage" { + value = { + storage_aws_external_id = snowflake_storage_integration.pems_raw.storage_aws_external_id + storage_aws_iam_user_arn = snowflake_storage_integration.pems_raw.storage_aws_iam_user_arn + } +} + +output "notification_channel" { + description = "ARN of the notification channel for pipes" + # All notification channels for the same bucket are the same. + value = snowflake_pipe.station_raw_pipe.notification_channel +} diff --git a/terraform/modules/snowflake-clearinghouse/raw_pipe.sql.tplfile b/terraform/modules/snowflake-clearinghouse/raw_pipe.sql.tplfile new file mode 100644 index 00000000..2f3c3c6f --- /dev/null +++ b/terraform/modules/snowflake-clearinghouse/raw_pipe.sql.tplfile @@ -0,0 +1,35 @@ +copy into ${database}.${schema}.${table} +from ( + select + metadata$filename, + try_to_timestamp_ntz($1, 'MM/DD/YYYY HH24:MI:SS'), + try_to_date($1, 'MM/DD/YYYY HH24:MI:SS'), + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14, + $15, + $16, + $17, + $18, + $19, + $20, + $21, + $22, + $23, + $24, + $25, + $26 + FROM @${database}.${schema}.${stage}/clhouse/raw/ + ) +file_format = ${database}.${schema}.${file_format} +on_error = continue diff --git a/terraform/modules/snowflake-clearinghouse/status_pipe.sql.tplfile b/terraform/modules/snowflake-clearinghouse/status_pipe.sql.tplfile new file mode 100644 index 00000000..4295c5ce --- /dev/null +++ b/terraform/modules/snowflake-clearinghouse/status_pipe.sql.tplfile @@ -0,0 +1,9 @@ +copy into ${database}.${schema}.${table} +from ( + select + metadata$filename, + $1 + FROM @${database}.${schema}.${stage}/clhouse/status/ + ) +file_format = ${database}.${schema}.${file_format} +on_error = continue diff --git a/terraform/modules/snowflake-clearinghouse/variables.tf b/terraform/modules/snowflake-clearinghouse/variables.tf new file mode 100644 index 00000000..181b39c4 --- /dev/null +++ b/terraform/modules/snowflake-clearinghouse/variables.tf @@ -0,0 +1,14 @@ +variable "environment" { + description = "Environment suffix" + type = string +} + +variable "s3_url" { + description = "S3 URL for the storage integration" + type = string +} + +variable "storage_aws_role_arn" { + description = "ARN of IAM role for Snowflake to assume with access to s3 storage" + type = string +}