Skip to content

Commit

Permalink
Basic notebook is working
Browse files Browse the repository at this point in the history
  • Loading branch information
Shmuma committed Nov 3, 2023
1 parent a140137 commit eae1878
Show file tree
Hide file tree
Showing 2 changed files with 262 additions and 24 deletions.
47 changes: 28 additions & 19 deletions doc/tutorials/cloud/00_setup.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -265,33 +265,42 @@
"outputs": [],
"source": [
"def setup_scripts(conf: SandboxConfig, bucketfs_jar: str):\n",
" sqls = [\n",
" f\"OPEN SCHEMA {conf.SCHEMA}\",\n",
" f\"\"\"\n",
" sqls = [ \n",
" \"OPEN SCHEMA {schema!i}\",\n",
" \"\"\"\n",
"--/\n",
" CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS\n",
" %scriptclass com.exasol.cloudetl.scriptclasses.FilesImportQueryGenerator;\n",
" %jar {bucketfs_jar};\n",
" %jar {jar_path!r};\n",
"/\n",
" \"\"\",\n",
" f\"\"\"\n",
" CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...) \n",
" EMITS (\n",
" filename VARCHAR(2000), \n",
" partition_index VARCHAR(100), \n",
" start_index DECIMAL(36, 0), \n",
" end_index DECIMAL(36, 0)\n",
" ) AS\n",
" %scriptclass com.exasol.cloudetl.scriptclasses.FilesMetadataReader;\n",
" %jar {bucketfs_jar}; \n",
" \"\"\"\n",
"--/\n",
" CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...) \n",
" EMITS (\n",
" filename VARCHAR(2000), \n",
" partition_index VARCHAR(100), \n",
" start_index DECIMAL(36, 0), \n",
" end_index DECIMAL(36, 0)\n",
" ) AS\n",
" %scriptclass com.exasol.cloudetl.scriptclasses.FilesMetadataReader;\n",
" %jar {jar_path!r};\n",
"/\n",
" \"\"\",\n",
" f\"\"\"\n",
" CREATE OR REPLACE JAVA SET SCRIPT IMPORT_FILES(...) EMITS (...) AS\n",
" %scriptclass com.exasol.cloudetl.scriptclasses.FilesDataImporter;\n",
" %jar {bucketfs_jar}; \n",
" \"\"\"\n",
"--/\n",
" CREATE OR REPLACE JAVA SET SCRIPT IMPORT_FILES(...) EMITS (...) AS\n",
" %scriptclass com.exasol.cloudetl.scriptclasses.FilesDataImporter;\n",
" %jar {jar_path!r};\n",
"/\n",
" \"\"\"\n",
" ]\n",
" with pyexasol.connect(**conf.connection_params) as conn:\n",
" for sql in sqls:\n",
" conn.execute(sql)"
" conn.execute(sql, query_params={\n",
" \"schema\": conf.SCHEMA,\n",
" \"jar_path\": bucketfs_jar,\n",
" })"
],
"metadata": {
"collapsed": false,
Expand Down
239 changes: 234 additions & 5 deletions doc/tutorials/cloud/01_import_data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 19,
"id": "initial_id",
"metadata": {
"collapsed": true,
"ExecuteTime": {
"end_time": "2023-11-02T11:58:38.388396Z",
"start_time": "2023-11-02T11:58:37.370015Z"
"end_time": "2023-11-03T13:36:38.270863Z",
"start_time": "2023-11-03T13:36:37.828187Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Schema created in 70.73ms\n",
"Schema created in 17.11ms\n",
"Jar for version 2.7.6 already exists in exasol-cloud-storage-extension-2.7.6.jar, skip downloading\n",
"Jar file is already present in the bucketfs\n"
]
Expand All @@ -31,6 +31,235 @@
"setup_cloud_storage_extension(conf)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"outputs": [],
"source": [
"import pyexasol"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-11-03T10:51:26.238517Z",
"start_time": "2023-11-03T10:51:26.234997Z"
}
},
"id": "a562bc8fb8a650d5"
},
{
"cell_type": "markdown",
"source": [
"# Importing small amount of data from parquet files\n",
"\n",
"For the beginning, we'll load small volume of data from publicly available [Youtube 8M dataset](https://registry.opendata.aws/yt8m/).\n",
"\n",
"In this example, we'll work with \"dataset vocabulary\" which is information about classes of videos. In total there are 3862 entries, which are stored in one single parquet file: \n",
"s3://aws-roda-ml-datalake/yt8m_ods/vocabulary/run-1644252350398-part-block-0-r-00000-snappy.parquet\n"
],
"metadata": {
"collapsed": false
},
"id": "839a313d4d1c26d3"
},
{
"cell_type": "markdown",
"source": [
"## Schema and target table\n",
"\n",
"As a first step, we need to obtain the schema of the data (set of columns stored in parquet files with their types). You might have this information in advance (if this is your dataset), but if not, you need to analyze parquet files to figure out their schema.\n",
"\n",
"One of the options of doing this are parquet-tools library wrapped into [docker container](https://hub.docker.com/r/nathanhowell/parquet-tools). To use it, you need to download one of parquet files locally, then run this docker container against this file. Using the same container, you can also peek into parquet files and looks its actual data.\n",
"\n",
"For the file above, I got the following schema information:\n",
"\n",
"```\n",
"message glue_schema {\n",
" optional binary Index (STRING);\n",
" optional binary TrainVideoCount (STRING);\n",
" optional binary KnowledgeGraphId (STRING);\n",
" optional binary Name (STRING);\n",
" optional binary WikiUrl (STRING);\n",
" optional binary Vertical1 (STRING);\n",
" optional binary Vertical2 (STRING);\n",
" optional binary Vertical3 (STRING);\n",
" optional binary WikiDescription (STRING);\n",
"}\n",
"``` \n",
"\n",
"From this schema we see that all the columns in parquet file has string type and optional (nullable).\n",
"Let's create the table in our database for this data. The names of columns are not important, just the order and their types have to match with parquet file schema."
],
"metadata": {
"collapsed": false
},
"id": "3dc968652856f86"
},
{
"cell_type": "code",
"execution_count": 38,
"outputs": [],
"source": [
"TABLE_NAME = \"Y8M_CLASSES\"\n",
"\n",
"sql = \"\"\"\n",
"create or replace table {schema_name!i}.{table_name!i} \n",
"(\n",
" ClsIndex VARCHAR2(1024),\n",
" TrainVideoCount VARCHAR2(1024),\n",
" KnowledgeGraphId VARCHAR2(1024),\n",
" Name VARCHAR2(1024),\n",
" WikiUrl VARCHAR2(1024),\n",
" Vertical1 VARCHAR2(1024),\n",
" Vertical2 VARCHAR2(1024),\n",
" Vertical3 VARCHAR2(1024),\n",
" WikiDescription VARCHAR2(2048)\n",
")\n",
"\"\"\"\n",
"\n",
"with pyexasol.connect(**conf.connection_params) as conn:\n",
" conn.execute(sql, query_params={\n",
" \"schema_name\": conf.SCHEMA,\n",
" \"table_name\": TABLE_NAME\n",
" })"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-11-03T13:46:24.373333Z",
"start_time": "2023-11-03T13:46:24.288010Z"
}
},
"id": "21cfe3edd3246a51"
},
{
"cell_type": "markdown",
"source": [
"## S3 credentials\n",
"\n",
"Even if our data resides on a publicly available S3 bucket (as the data we're dealing with in this examle), we need to provide S3 credentials to access the bucket. \n",
"\n",
"TODO: we have to provide valid credentials, even for public buckets, see [this issue](https://github.com/exasol/cloud-storage-extension/issues/283)"
],
"metadata": {
"collapsed": false
},
"id": "df21776e2435eced"
},
{
"cell_type": "code",
"execution_count": 32,
"outputs": [],
"source": [
"sql = \"\"\"\n",
"CREATE OR REPLACE CONNECTION S3_CONNECTION TO '' USER '' \n",
"IDENTIFIED BY 'S3_ACCESS_KEY={access_key!r};S3_SECRET_KEY={secret_key!r}';\n",
"\"\"\"\n",
"\n",
"S3_ACCESS_KEY = \"<ACCESS_KEY>\"\n",
"S3_SECRET_KEY = \"<SECRET_KEY>\"\n",
"\n",
"with pyexasol.connect(**conf.connection_params) as conn:\n",
" conn.execute(sql, query_params={\n",
" \"schema\": conf.SCHEMA,\n",
" \"access_key\": S3_ACCESS_KEY,\n",
" \"secret_key\": S3_SECRET_KEY,\n",
" })"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-11-03T13:43:05.595870Z",
"start_time": "2023-11-03T13:43:05.551031Z"
}
},
"id": "b2ec486162082706"
},
{
"cell_type": "markdown",
"source": [
"## Importing data\n",
"\n",
"Now it's time to import our data. We call the `IMPORT_PATH` script, providing location of parquet files, their format, s3 endpoint (which has to match the bucket's configuration) and name of our connection object."
],
"metadata": {
"collapsed": false
},
"id": "50a05075716522ec"
},
{
"cell_type": "code",
"execution_count": 39,
"outputs": [],
"source": [
"params = {\n",
" \"schema\": conf.SCHEMA,\n",
" \"table\": TABLE_NAME, \n",
"}\n",
"\n",
"sql = \"\"\"\n",
"IMPORT INTO {schema!i}.{table!i}\n",
"FROM SCRIPT {schema!i}.IMPORT_PATH WITH\n",
" BUCKET_PATH = 's3a://aws-roda-ml-datalake/yt8m_ods/vocabulary/*'\n",
" DATA_FORMAT = 'PARQUET'\n",
" S3_ENDPOINT = 's3-us-west-2.amazonaws.com'\n",
" CONNECTION_NAME = 'S3_CONNECTION';\n",
"\"\"\"\n",
"\n",
"with pyexasol.connect(**conf.connection_params) as conn:\n",
" conn.execute(sql, query_params=params)"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-11-03T13:46:41.978506Z",
"start_time": "2023-11-03T13:46:27.899568Z"
}
},
"id": "58c142629eb67bae"
},
{
"cell_type": "markdown",
"source": [
"Let's check that data was imported"
],
"metadata": {
"collapsed": false
},
"id": "2106b40abdf36482"
},
{
"cell_type": "code",
"execution_count": 44,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Loaded 3862 rows\n",
"('2054', '401', '/m/01hrv5', 'Popcorn', 'https://en.wikipedia.org/wiki/Popcorn', 'Food & Drink', None, None, 'Popcorn is a type of corn that expands from the kernel and puffs up when heated. Popcorn is able to pop like amaranth grain, sorghum, quinoa, and millet. When heated, pressure builds within the kernel, and a small explosion is the end result. Some strains of corn are now cultivated specifically as popping corns. There are various techniques for popping corn. Along with prepackaged popcorn, which is generally intended to be prepared in a microwave oven, there are small home appliances for popping corn. These methods require the use of minimally processed popping corn. A larger-scale, commercial popcorn machine, which resembled a modern movie theater popcorn machine on a cart with large bicycle style wheels, was invented by Charles Cretors in the late 19th century. Unpopped popcorn is considered nonperishable and will last indefinitely if stored in ideal conditions. Depending on how it is prepared and cooked, some consider popcorn to be a health food, while others caution against it for a variety of reasons. Popcorn can also have non-food applications, ranging from holiday decorations to packaging materials.')\n"
]
}
],
"source": [
"with pyexasol.connect(**conf.connection_params) as conn:\n",
" data_rows = conn.execute(\"select count(*) from {schema!i}.{table!i}\", query_params=params)\n",
" count = next(data_rows)[0] \n",
" print(f\"Loaded {count} rows\")\n",
" data = conn.execute(\"select * from {schema!i}.{table!i} limit 1\", query_params=params)\n",
" for row in data:\n",
" print(row)\n",
" "
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-11-03T13:49:13.715664Z",
"start_time": "2023-11-03T13:49:13.662729Z"
}
},
"id": "a65f19ecc0ce237a"
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -39,7 +268,7 @@
"metadata": {
"collapsed": false
},
"id": "b651b0df83cea31c"
"id": "d2316eb463532aa1"
}
],
"metadata": {
Expand Down

0 comments on commit eae1878

Please sign in to comment.