diff --git a/Ingesting Data for LLM Functions and Iceberg/Ingesting data for data pipelines.ipynb b/Ingesting Data for LLM Functions and Iceberg/Ingesting data for data pipelines.ipynb new file mode 100644 index 0000000..2e5c0d5 --- /dev/null +++ b/Ingesting Data for LLM Functions and Iceberg/Ingesting data for data pipelines.ipynb @@ -0,0 +1,635 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "13f35857-7833-4c7a-820b-421f7156fc94", + "metadata": { + "collapsed": false, + "name": "cell1" + }, + "source": [ + "# How to ingest files from Snowflake Stages, infer the schemas, use Cortex LLM functions, and leverage Iceberg tables in Coalesce\n", + "\n", + "In this notebook, you can view the SQL generated to perform all of the actions to build an LLM pipeline in Coalesce. This SQL is automatically generated and managed by Coalesce. Variations of this code are possible depending on the version of Coalesce packages are you using. \n", + "\n", + "The dataset is a call transcripts dataset that contains calls in multiple different languages. This walkthrough will teach you to ingest the data, using Coalesce Cortex Functions to translate the data, and expose the data to your organization using Iceberg nodes. \n", + "\n", + "First, we will use an Inferschema node to create a blank table in Snowflake which contains the schema of the file(s) in a stage. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4babf2c9-2d53-48dc-9b2e-07cda9bcc03c", + "metadata": { + "codeCollapsed": false, + "collapsed": false, + "language": "python", + "name": "cell2" + }, + "outputs": [], + "source": [ + "-- Anonymous stored procedure block\n", + "DECLARE\n", + "-- Variables\n", + "col_name VARCHAR;\n", + "\n", + "col_type VARCHAR;\n", + "\n", + "create_sql VARCHAR;\n", + "\n", + "final_sql VARCHAR;\n", + "\n", + "-- Cursor to loop through each row in GROUP_LOAD table\n", + "res RESULTSET;\n", + "\n", + "select_statement:= 'CREATE OR REPLACE TABLE \"DATABASE_NAME\".\"SCHEMA_NAME\".CALL_TRANSCRIPTS\n", + " USING TEMPLATE (SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))\n", + " FROM TABLE(\n", + " INFER_SCHEMA(\n", + " LOCATION=>''@\"PC_COALESCE_DB\".\"CALLS\".call_transcripts_data_stage'',\n", + " FILES =>''call_transcripts.csv'',\n", + " FILE_FORMAT=>''\"PC_COALESCE_DB\".\"CALLS\".csvformat''\n", + " )\n", + " ))';\n", + "\n", + "BEGIN\n", + "EXECUTE IMMEDIATE:select_statement;\n", + "\n", + "-- Return a success message\n", + "RETURN 'CALL_TRANSCRIPTS table created successfully.';\n", + "\n", + "END;" + ] + }, + { + "cell_type": "markdown", + "id": "b8151396-3ae3-4991-8ef0-be82fc33f363", + "metadata": { + "collapsed": false, + "name": "cell3" + }, + "source": [ + "Next, we need to load the data from the file(s) in the stage, into the table containing the schema we just created. We can use a Copy Into node for this, which automatically generates the SQL below" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f7d7f866-a698-457f-8bd0-4deff26ba329", + "metadata": { + "codeCollapsed": false, + "collapsed": false, + "language": "sql", + "name": "cell4", + "vscode": { + "languageId": "sql" + } + }, + "outputs": [], + "source": [ + "CREATE OR REPLACE table\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"CALL_TRANSCRIPTS_LOAD\" (\n", + " \"DATE_CREATED\" DATE,\n", + " \"LANGUAGE\" VARCHAR (16777216),\n", + " \"COUNTRY\" VARCHAR (16777216),\n", + " \"PRODUCT\" VARCHAR (16777216),\n", + " \"CATEGORY\" VARCHAR (16777216),\n", + " \"DAMAGE_TYPE\" VARCHAR (16777216),\n", + " \"TRANSCRIPT\" VARCHAR (16777216),\n", + " \"SRC\" VARCHAR,\n", + " \"LOAD_TIMESTAMP\" TIMESTAMP_NTZ,\n", + " \"FILENAME\" STRING,\n", + " \"FILE_ROW_NUMBER\" NUMBER,\n", + " \"FILE_LAST_MODIFIED\" TIMESTAMP_NTZ,\n", + " \"SCAN_TIME\" TIMESTAMP_NTZ\n", + " );\n", + "\n", + "COPY INTO \"DATABASE_NAME\".\"SCHEMA_NAME\".\"CALL_TRANSCRIPTS_LOAD\" (\n", + " \"DATE_CREATED\",\n", + " \"LANGUAGE\",\n", + " \"COUNTRY\",\n", + " \"PRODUCT\",\n", + " \"CATEGORY\",\n", + " \"DAMAGE_TYPE\",\n", + " \"TRANSCRIPT\",\n", + " \"SRC\",\n", + " \"LOAD_TIMESTAMP\",\n", + " \"FILENAME\",\n", + " \"FILE_ROW_NUMBER\",\n", + " \"FILE_LAST_MODIFIED\",\n", + " \"SCAN_TIME\"\n", + ")\n", + "FROM\n", + " (\n", + " SELECT\n", + " $1::DATE AS \"DATE_CREATED\",\n", + " $2::VARCHAR (16777216) AS \"LANGUAGE\",\n", + " $3::VARCHAR (16777216) AS \"COUNTRY\",\n", + " $4::VARCHAR (16777216) AS \"PRODUCT\",\n", + " $5::VARCHAR (16777216) AS \"CATEGORY\",\n", + " $6::VARCHAR (16777216) AS \"DAMAGE_TYPE\",\n", + " $7::VARCHAR (16777216) AS \"TRANSCRIPT\",\n", + " $1 AS \"SRC\",\n", + " current_timestamp()::timestamp_ntz AS \"LOAD_TIMESTAMP\",\n", + " METADATA$FILENAME AS \"FILENAME\",\n", + " METADATA$FILE_ROW_NUMBER AS \"FILE_ROW_NUMBER\",\n", + " METADATA$FILE_LAST_MODIFIED AS \"FILE_LAST_MODIFIED\",\n", + " METADATA$START_SCAN_TIME AS \"SCAN_TIME\"\n", + " FROM\n", + " '@PC_COALESCE_DB.CALLS.call_transcripts_data_stage'\n", + " ) FILES = ('call_transcripts.csv') FILE_FORMAT = (\n", + " TYPE = CSV RECORD_DELIMITER = '\n", + "' FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '\\042' SKIP_HEADER = 1\n", + " );" + ] + }, + { + "cell_type": "markdown", + "id": "614a9f59-b202-4102-81e8-192b66b656fd", + "metadata": { + "collapsed": false, + "name": "cell5" + }, + "source": [ + "Now that we have a table that is populated in Snowflake, we can start building out a data pipeline and transforming this data. The first thing we'll do is split the German and French langauges from the dataset into their own tables. The following SQL is generated by stage nodes in Coalesce, and represents both stages used to split the French and German languages into their own tables. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18fdb36a-f3f6-46b0-92db-e06a28b14867", + "metadata": { + "codeCollapsed": false, + "collapsed": false, + "language": "sql", + "name": "cell6", + "vscode": { + "languageId": "sql" + } + }, + "outputs": [], + "source": [ + "CREATE OR REPLACE TABLE\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"STG_CALL_TRANSCRIPTS_GERMAN\" (\n", + " \"DATE_CREATED\" DATE,\n", + " \"LANGUAGE\" VARCHAR (16777216),\n", + " \"COUNTRY\" VARCHAR (16777216),\n", + " \"PRODUCT\" VARCHAR (16777216),\n", + " \"CATEGORY\" VARCHAR (16777216),\n", + " \"DAMAGE_TYPE\" VARCHAR (16777216),\n", + " \"TRANSCRIPT\" VARCHAR (16777216),\n", + " \"SRC\" VARCHAR,\n", + " \"LOAD_TIMESTAMP\" TIMESTAMP_NTZ,\n", + " \"FILENAME\" STRING,\n", + " \"FILE_ROW_NUMBER\" NUMBER,\n", + " \"FILE_LAST_MODIFIED\" TIMESTAMP_NTZ,\n", + " \"SCAN_TIME\" TIMESTAMP_NTZ\n", + " );\n", + "\n", + "INSERT INTO\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"STG_CALL_TRANSCRIPTS_GERMAN\" (\n", + " \"DATE_CREATED\",\n", + " \"LANGUAGE\",\n", + " \"COUNTRY\",\n", + " \"PRODUCT\",\n", + " \"CATEGORY\",\n", + " \"DAMAGE_TYPE\",\n", + " \"TRANSCRIPT\",\n", + " \"SRC\",\n", + " \"LOAD_TIMESTAMP\",\n", + " \"FILENAME\",\n", + " \"FILE_ROW_NUMBER\",\n", + " \"FILE_LAST_MODIFIED\",\n", + " \"SCAN_TIME\"\n", + " )\n", + "SELECT\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"DATE_CREATED\" AS \"DATE_CREATED\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"LANGUAGE\" AS \"LANGUAGE\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"COUNTRY\" AS \"COUNTRY\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"PRODUCT\" AS \"PRODUCT\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"CATEGORY\" AS \"CATEGORY\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"DAMAGE_TYPE\" AS \"DAMAGE_TYPE\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"TRANSCRIPT\" AS \"TRANSCRIPT\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"SRC\" AS \"SRC\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"LOAD_TIMESTAMP\" AS \"LOAD_TIMESTAMP\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"FILENAME\" AS \"FILENAME\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"FILE_ROW_NUMBER\" AS \"FILE_ROW_NUMBER\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"FILE_LAST_MODIFIED\" AS \"FILE_LAST_MODIFIED\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"SCAN_TIME\" AS \"SCAN_TIME\"\n", + "FROM\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"CALL_TRANSCRIPTS_LOAD\" \"CALL_TRANSCRIPTS_LOAD\"\n", + "WHERE\n", + " \"LANGUAGE\" = 'German';\n", + "\n", + "CREATE OR REPLACE TABLE\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"STG_CALL_TRANSCRIPTS_FRENCH\" (\n", + " \"DATE_CREATED\" DATE,\n", + " \"LANGUAGE\" VARCHAR (16777216),\n", + " \"COUNTRY\" VARCHAR (16777216),\n", + " \"PRODUCT\" VARCHAR (16777216),\n", + " \"CATEGORY\" VARCHAR (16777216),\n", + " \"DAMAGE_TYPE\" VARCHAR (16777216),\n", + " \"TRANSCRIPT\" VARCHAR (16777216),\n", + " \"SRC\" VARCHAR,\n", + " \"LOAD_TIMESTAMP\" TIMESTAMP_NTZ,\n", + " \"FILENAME\" STRING,\n", + " \"FILE_ROW_NUMBER\" NUMBER,\n", + " \"FILE_LAST_MODIFIED\" TIMESTAMP_NTZ,\n", + " \"SCAN_TIME\" TIMESTAMP_NTZ\n", + " );\n", + "\n", + "\n", + "INSERT INTO\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"STG_CALL_TRANSCRIPTS_FRENCH\" (\n", + " \"DATE_CREATED\",\n", + " \"LANGUAGE\",\n", + " \"COUNTRY\",\n", + " \"PRODUCT\",\n", + " \"CATEGORY\",\n", + " \"DAMAGE_TYPE\",\n", + " \"TRANSCRIPT\",\n", + " \"SRC\",\n", + " \"LOAD_TIMESTAMP\",\n", + " \"FILENAME\",\n", + " \"FILE_ROW_NUMBER\",\n", + " \"FILE_LAST_MODIFIED\",\n", + " \"SCAN_TIME\"\n", + " )\n", + "SELECT\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"DATE_CREATED\" AS \"DATE_CREATED\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"LANGUAGE\" AS \"LANGUAGE\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"COUNTRY\" AS \"COUNTRY\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"PRODUCT\" AS \"PRODUCT\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"CATEGORY\" AS \"CATEGORY\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"DAMAGE_TYPE\" AS \"DAMAGE_TYPE\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"TRANSCRIPT\" AS \"TRANSCRIPT\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"SRC\" AS \"SRC\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"LOAD_TIMESTAMP\" AS \"LOAD_TIMESTAMP\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"FILENAME\" AS \"FILENAME\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"FILE_ROW_NUMBER\" AS \"FILE_ROW_NUMBER\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"FILE_LAST_MODIFIED\" AS \"FILE_LAST_MODIFIED\",\n", + " \"CALL_TRANSCRIPTS_LOAD\".\"SCAN_TIME\" AS \"SCAN_TIME\"\n", + "FROM\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"CALL_TRANSCRIPTS_LOAD\" \"CALL_TRANSCRIPTS_LOAD\"\n", + "WHERE\n", + " \"LANGUAGE\" IN ('English', 'French');" + ] + }, + { + "cell_type": "markdown", + "id": "9feb2dbb-8752-41c1-bd88-f2075e89f4ea", + "metadata": { + "collapsed": false, + "name": "cell7" + }, + "source": [ + "Now that we have the call transcripts of each language in their own tables, we can use Coalesce Cortex Function nodes, which leverage Snowflake Cortex functions, to translate the data into English. Coalesce automatically generates the code below to perform this action." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2bf5c75a-b4e8-4212-a645-b8d63102757d", + "metadata": { + "codeCollapsed": false, + "language": "python", + "name": "cell8", + "vscode": { + "languageId": "sql" + } + }, + "outputs": [], + "source": [ + "CREATE OR REPLACE TABLE\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"LLM__CALL_TRANSCRIPTS_GERMAN\" (\n", + " \"DATE_CREATED\" DATE,\n", + " \"LANGUAGE\" VARCHAR (16777216),\n", + " \"COUNTRY\" VARCHAR (16777216),\n", + " \"PRODUCT\" VARCHAR (16777216),\n", + " \"CATEGORY\" VARCHAR (16777216),\n", + " \"DAMAGE_TYPE\" VARCHAR (16777216),\n", + " \"TRANSCRIPT\" VARCHAR (16777216),\n", + " \"SRC\" VARCHAR,\n", + " \"LOAD_TIMESTAMP\" TIMESTAMP_NTZ,\n", + " \"FILENAME\" STRING,\n", + " \"FILE_ROW_NUMBER\" NUMBER,\n", + " \"FILE_LAST_MODIFIED\" TIMESTAMP_NTZ,\n", + " \"SCAN_TIME\" TIMESTAMP_NTZ\n", + " );\n", + "\n", + "INSERT INTO\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"LLM__CALL_TRANSCRIPTS_GERMAN\" (\n", + " \"DATE_CREATED\",\n", + " \"LANGUAGE\",\n", + " \"COUNTRY\",\n", + " \"PRODUCT\",\n", + " \"CATEGORY\",\n", + " \"DAMAGE_TYPE\",\n", + " \"TRANSCRIPT\",\n", + " \"SRC\",\n", + " \"LOAD_TIMESTAMP\",\n", + " \"FILENAME\",\n", + " \"FILE_ROW_NUMBER\",\n", + " \"FILE_LAST_MODIFIED\",\n", + " \"SCAN_TIME\"\n", + " )\n", + "SELECT\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"DATE_CREATED\" AS \"DATE_CREATED\",\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"LANGUAGE\" AS \"LANGUAGE\",\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"COUNTRY\" AS \"COUNTRY\",\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"PRODUCT\" AS \"PRODUCT\",\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"CATEGORY\" AS \"CATEGORY\",\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"DAMAGE_TYPE\" AS \"DAMAGE_TYPE\",\n", + " SNOWFLAKE.CORTEX.TRANSLATE (\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"TRANSCRIPT\",\n", + " 'de',\n", + " 'en'\n", + " ) AS \"TRANSCRIPT\",\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"SRC\" AS \"SRC\",\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"LOAD_TIMESTAMP\" AS \"LOAD_TIMESTAMP\",\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"FILENAME\" AS \"FILENAME\",\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"FILE_ROW_NUMBER\" AS \"FILE_ROW_NUMBER\",\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"FILE_LAST_MODIFIED\" AS \"FILE_LAST_MODIFIED\",\n", + " \"STG_CALL_TRANSCRIPTS_GERMAN\".\"SCAN_TIME\" AS \"SCAN_TIME\"\n", + "FROM\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"STG_CALL_TRANSCRIPTS_GERMAN\" \"STG_CALL_TRANSCRIPTS_GERMAN\";\n", + "\n", + "\n", + "CREATE OR REPLACE TABLE\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"LLM__CALL_TRANSCRIPTS_FRENCH\" (\n", + " \"DATE_CREATED\" DATE,\n", + " \"LANGUAGE\" VARCHAR (16777216),\n", + " \"COUNTRY\" VARCHAR (16777216),\n", + " \"PRODUCT\" VARCHAR (16777216),\n", + " \"CATEGORY\" VARCHAR (16777216),\n", + " \"DAMAGE_TYPE\" VARCHAR (16777216),\n", + " \"TRANSCRIPT\" VARCHAR (16777216),\n", + " \"SRC\" VARCHAR,\n", + " \"LOAD_TIMESTAMP\" TIMESTAMP_NTZ,\n", + " \"FILENAME\" STRING,\n", + " \"FILE_ROW_NUMBER\" NUMBER,\n", + " \"FILE_LAST_MODIFIED\" TIMESTAMP_NTZ,\n", + " \"SCAN_TIME\" TIMESTAMP_NTZ\n", + " );\n", + "\n", + "\n", + "INSERT INTO\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"LLM__CALL_TRANSCRIPTS_FRENCH\" (\n", + " \"DATE_CREATED\",\n", + " \"LANGUAGE\",\n", + " \"COUNTRY\",\n", + " \"PRODUCT\",\n", + " \"CATEGORY\",\n", + " \"DAMAGE_TYPE\",\n", + " \"TRANSCRIPT\",\n", + " \"SRC\",\n", + " \"LOAD_TIMESTAMP\",\n", + " \"FILENAME\",\n", + " \"FILE_ROW_NUMBER\",\n", + " \"FILE_LAST_MODIFIED\",\n", + " \"SCAN_TIME\"\n", + " )\n", + "SELECT\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"DATE_CREATED\" AS \"DATE_CREATED\",\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"LANGUAGE\" AS \"LANGUAGE\",\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"COUNTRY\" AS \"COUNTRY\",\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"PRODUCT\" AS \"PRODUCT\",\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"CATEGORY\" AS \"CATEGORY\",\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"DAMAGE_TYPE\" AS \"DAMAGE_TYPE\",\n", + " SNOWFLAKE.CORTEX.TRANSLATE (\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"TRANSCRIPT\",\n", + " 'fr',\n", + " 'en'\n", + " ) AS \"TRANSCRIPT\",\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"SRC\" AS \"SRC\",\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"LOAD_TIMESTAMP\" AS \"LOAD_TIMESTAMP\",\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"FILENAME\" AS \"FILENAME\",\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"FILE_ROW_NUMBER\" AS \"FILE_ROW_NUMBER\",\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"FILE_LAST_MODIFIED\" AS \"FILE_LAST_MODIFIED\",\n", + " \"STG_CALL_TRANSCRIPTS_FRENCH\".\"SCAN_TIME\" AS \"SCAN_TIME\"\n", + "FROM\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"STG_CALL_TRANSCRIPTS_FRENCH\" \"STG_CALL_TRANSCRIPTS_FRENCH\";" + ] + }, + { + "cell_type": "markdown", + "id": "94b0bc16-c31c-4cf0-8bf0-f2fdcdbfac0f", + "metadata": { + "collapsed": false, + "name": "cell10" + }, + "source": [ + "With the call transcripts now translated, we can unify the two datasets to bring together all of the translated calls into a singular table. We can use a `UNION ALL` for this within a Stage node in Coalesce. The following is the code that is generated by Coalesce to perform this action. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bac152b7-8c98-4e0a-9ecc-42f2c104f49d", + "metadata": { + "codeCollapsed": false, + "language": "python", + "name": "cell11", + "vscode": { + "languageId": "sql" + } + }, + "outputs": [], + "source": [ + "CREATE OR REPLACE TABLE\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"STG_CALL_TRANSCRIPTS_ALL\" (\n", + " \"DATE_CREATED\" DATE,\n", + " \"LANGUAGE\" VARCHAR (16777216),\n", + " \"COUNTRY\" VARCHAR (16777216),\n", + " \"PRODUCT\" VARCHAR (16777216),\n", + " \"CATEGORY\" VARCHAR (16777216),\n", + " \"DAMAGE_TYPE\" VARCHAR (16777216),\n", + " \"TRANSCRIPT\" VARCHAR (16777216)\n", + " );\n", + "\n", + "INSERT INTO\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"STG_CALL_TRANSCRIPTS_ALL\" (\n", + " \"DATE_CREATED\",\n", + " \"LANGUAGE\",\n", + " \"COUNTRY\",\n", + " \"PRODUCT\",\n", + " \"CATEGORY\",\n", + " \"DAMAGE_TYPE\",\n", + " \"TRANSCRIPT\"\n", + " )\n", + "SELECT\n", + " \"LLM__CALL_TRANSCRIPTS_GERMAN\".\"DATE_CREATED\" AS \"DATE_CREATED\",\n", + " \"LLM__CALL_TRANSCRIPTS_GERMAN\".\"LANGUAGE\" AS \"LANGUAGE\",\n", + " \"LLM__CALL_TRANSCRIPTS_GERMAN\".\"COUNTRY\" AS \"COUNTRY\",\n", + " \"LLM__CALL_TRANSCRIPTS_GERMAN\".\"PRODUCT\" AS \"PRODUCT\",\n", + " \"LLM__CALL_TRANSCRIPTS_GERMAN\".\"CATEGORY\" AS \"CATEGORY\",\n", + " \"LLM__CALL_TRANSCRIPTS_GERMAN\".\"DAMAGE_TYPE\" AS \"DAMAGE_TYPE\",\n", + " \"LLM__CALL_TRANSCRIPTS_GERMAN\".\"TRANSCRIPT\" AS \"TRANSCRIPT\"\n", + "FROM\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"LLM__CALL_TRANSCRIPTS_GERMAN\" \"LLM__CALL_TRANSCRIPTS_GERMAN\"\n", + "UNION ALL\n", + "SELECT\n", + " \"LLM__CALL_TRANSCRIPTS_FRENCH\".\"DATE_CREATED\" AS \"DATE_CREATED\",\n", + " \"LLM__CALL_TRANSCRIPTS_FRENCH\".\"LANGUAGE\" AS \"LANGUAGE\",\n", + " \"LLM__CALL_TRANSCRIPTS_FRENCH\".\"COUNTRY\" AS \"COUNTRY\",\n", + " \"LLM__CALL_TRANSCRIPTS_FRENCH\".\"PRODUCT\" AS \"PRODUCT\",\n", + " \"LLM__CALL_TRANSCRIPTS_FRENCH\".\"CATEGORY\" AS \"CATEGORY\",\n", + " \"LLM__CALL_TRANSCRIPTS_FRENCH\".\"DAMAGE_TYPE\" AS \"DAMAGE_TYPE\",\n", + " \"LLM__CALL_TRANSCRIPTS_FRENCH\".\"TRANSCRIPT\" AS \"TRANSCRIPT\"\n", + "FROM\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"LLM__CALL_TRANSCRIPTS_FRENCH\" \"LLM__CALL_TRANSCRIPTS_FRENCH\"" + ] + }, + { + "cell_type": "markdown", + "id": "b5ff2c51-66d9-4ca4-a060-0b40286ae37c", + "metadata": { + "collapsed": false, + "name": "cell12" + }, + "source": [ + "With the dataset containing the translated call transcripts unified, we can now use another Cortex Function node to create a sentiment score and extract the customer name from the transcript column. Coalesce automatically generates the following code. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1f7b5940-47cb-438c-a666-817267b4bf39", + "metadata": { + "codeCollapsed": false, + "collapsed": false, + "language": "python", + "name": "cell13", + "vscode": { + "languageId": "sql" + } + }, + "outputs": [], + "source": [ + "CREATE OR REPLACE TABLE\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"LLM__CALL_TRANSCRIPTS_ALL\" (\n", + " \"DATE_CREATED\" DATE,\n", + " \"LANGUAGE\" VARCHAR (16777216),\n", + " \"COUNTRY\" VARCHAR (16777216),\n", + " \"PRODUCT\" VARCHAR (16777216),\n", + " \"CATEGORY\" VARCHAR (16777216),\n", + " \"DAMAGE_TYPE\" VARCHAR (16777216),\n", + " \"TRANSCRIPT_SENTIMENT\" VARCHAR (16777216),\n", + " \"TRANSCRIPT_CUSTOMER\" ARRAY\n", + " );\n", + "\n", + "INSERT INTO\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"LLM__CALL_TRANSCRIPTS_ALL\" (\n", + " \"DATE_CREATED\",\n", + " \"LANGUAGE\",\n", + " \"COUNTRY\",\n", + " \"PRODUCT\",\n", + " \"CATEGORY\",\n", + " \"DAMAGE_TYPE\",\n", + " \"TRANSCRIPT_SENTIMENT\",\n", + " \"TRANSCRIPT_CUSTOMER\"\n", + " )\n", + "SELECT\n", + " \"STG_CALL_TRANSCRIPTS_ALL\".\"DATE_CREATED\" AS \"DATE_CREATED\",\n", + " \"STG_CALL_TRANSCRIPTS_ALL\".\"LANGUAGE\" AS \"LANGUAGE\",\n", + " \"STG_CALL_TRANSCRIPTS_ALL\".\"COUNTRY\" AS \"COUNTRY\",\n", + " \"STG_CALL_TRANSCRIPTS_ALL\".\"PRODUCT\" AS \"PRODUCT\",\n", + " \"STG_CALL_TRANSCRIPTS_ALL\".\"CATEGORY\" AS \"CATEGORY\",\n", + " \"STG_CALL_TRANSCRIPTS_ALL\".\"DAMAGE_TYPE\" AS \"DAMAGE_TYPE\",\n", + " SNOWFLAKE.CORTEX.SENTIMENT (\"STG_CALL_TRANSCRIPTS_ALL\".\"TRANSCRIPT\") AS \"TRANSCRIPT_SENTIMENT\",\n", + " SNOWFLAKE.CORTEX.EXTRACT_ANSWER (\n", + " \"STG_CALL_TRANSCRIPTS_ALL\".\"TRANSCRIPT\",\n", + " '''who is the customer?'''\n", + " ) AS \"TRANSCRIPT_CUSTOMER\"\n", + "FROM\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"STG_CALL_TRANSCRIPTS_ALL\" \"STG_CALL_TRANSCRIPTS_ALL\";" + ] + }, + { + "cell_type": "markdown", + "id": "966f07d5-d246-49da-b133-6ab39fb0578d", + "metadata": { + "collapsed": false, + "name": "cell15" + }, + "source": [ + "The last step is to expose the new analytical dataset we have created to our object storage so that other systems can work with a single copy of the data we have produced in this pipeline. We can do that with a Snowflake Iceberg table node, which will seamlessly create an Iceberg format table in an S3 bucket (assuming you have already configured an external volume). Coalesce automatically generates the code below. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "76dd9c74-019d-47ff-a462-10499503bace", + "metadata": { + "codeCollapsed": false, + "collapsed": false, + "language": "python", + "name": "cell16", + "vscode": { + "languageId": "sql" + } + }, + "outputs": [], + "source": [ + "CREATE\n", + "OR REPLACE ICEBERG TABLE \"DATABASE_NAME\".\"SCHEMA_NAME\".\"ICT_CALL_TRANSCRIPTS_ALL\" (\n", + " \"DATE_CREATED\" DATE,\n", + " \"LANGUAGE\" STRING,\n", + " \"COUNTRY\" STRING,\n", + " \"PRODUCT\" STRING,\n", + " \"CATEGORY\" STRING,\n", + " \"DAMAGE_TYPE\" STRING,\n", + " \"TRANSCRIPT_SENTIMENT\" STRING,\n", + " \"CUSTOMER_NAME\" STRING\n", + ") EXTERNAL_VOLUME = 'iceberg_external_volume' BASE_LOCATION = 'transcriptions_customer' CATALOG = 'SNOWFLAKE';\n", + "\n", + "\n", + "INSERT INTO\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"ICT_CALL_TRANSCRIPTS_ALL\" (\n", + " \"DATE_CREATED\",\n", + " \"LANGUAGE\",\n", + " \"COUNTRY\",\n", + " \"PRODUCT\",\n", + " \"CATEGORY\",\n", + " \"DAMAGE_TYPE\",\n", + " \"TRANSCRIPT_SENTIMENT\",\n", + " \"CUSTOMER_NAME\"\n", + " ) (\n", + " SELECT\n", + " \"LLM__CALL_TRANSCRIPTS_ALL\".\"DATE_CREATED\" AS \"DATE_CREATED\",\n", + " \"LLM__CALL_TRANSCRIPTS_ALL\".\"LANGUAGE\" AS \"LANGUAGE\",\n", + " \"LLM__CALL_TRANSCRIPTS_ALL\".\"COUNTRY\" AS \"COUNTRY\",\n", + " \"LLM__CALL_TRANSCRIPTS_ALL\".\"PRODUCT\" AS \"PRODUCT\",\n", + " \"LLM__CALL_TRANSCRIPTS_ALL\".\"CATEGORY\" AS \"CATEGORY\",\n", + " \"LLM__CALL_TRANSCRIPTS_ALL\".\"DAMAGE_TYPE\" AS \"DAMAGE_TYPE\",\n", + " \"LLM__CALL_TRANSCRIPTS_ALL\".\"TRANSCRIPT_SENTIMENT\" AS \"TRANSCRIPT_SENTIMENT\",\n", + " \"TRANSCRIPT_CUSTOMER\".value:\"answer\"::STRING AS \"CUSTOMER_NAME\"\n", + " FROM\n", + " \"DATABASE_NAME\".\"SCHEMA_NAME\".\"LLM__CALL_TRANSCRIPTS_ALL\" \"LLM__CALL_TRANSCRIPTS_ALL\",\n", + " lateral flatten(input => \"TRANSCRIPT_CUSTOMER\", OUTER => TRUE) \"TRANSCRIPT_CUSTOMER\"\n", + " );" + ] + }, + { + "cell_type": "markdown", + "id": "d149c3c7-4a48-446e-a75f-beefc949790b", + "metadata": { + "collapsed": false, + "name": "cell20" + }, + "source": [ + "### Conclusion\n", + "In this walkthrough, you have seen how Coalesce automatically generates code to seamlessly build data pipelines on Snowflake. Using the Coalesce, users can defer the need of writing all of the code found here, and instead, focus on the business impact of the data products they are building. " + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Streamlit Notebook", + "name": "streamlit" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}