From 4407652a994b1e4468788b385246622d7d31ca37 Mon Sep 17 00:00:00 2001 From: Ioannis Foukarakis Date: Wed, 9 Oct 2024 17:21:44 +0300 Subject: [PATCH] Add loading from S3 --- tests/utils/packets/test_service.py | 83 ++++++++++++++++++++++++++++- utils/packets/__main__.py | 35 +++++++++++- utils/packets/aws.py | 2 +- utils/packets/service.py | 61 +++++++++++++++++++++ 4 files changed, 178 insertions(+), 3 deletions(-) diff --git a/tests/utils/packets/test_service.py b/tests/utils/packets/test_service.py index 971f74407..9362babdf 100644 --- a/tests/utils/packets/test_service.py +++ b/tests/utils/packets/test_service.py @@ -1,8 +1,10 @@ import pandas as pd +import pytest +from moto import mock_aws from pandas._testing import assert_frame_equal from tests.utils.packets import SUPPORT_DIR, SURVEY_DIR -from utils.packets.service import ingest_support_packet, ingest_survey_packet +from utils.packets.service import ingest_support_packet, ingest_survey_packet, ingest_surveys_from_s3 def test_ingest_survey_packet(sqlalchemy_memory_engine): @@ -202,3 +204,82 @@ def test_ingest_support_packet_twice(sqlalchemy_memory_engine): ) assert result['metadata_server_id'] == 'rmg9ib5rspy93jxswyc454bwzo' assert result['source'] == 's3://bucket/valid_with_metadata.zip' + + +@mock_aws +def test_ingest_survey_from_s3_fresh(s3_boto, sqlalchemy_memory_engine): + with sqlalchemy_memory_engine.connect() as conn: + # GIVEN: database setup has been completed + conn.execute("ATTACH DATABASE ':memory:' AS 'test_schema'") + + # GIVEN: an S3 bucket + bucket_name = 'bucket-with-files' + s3_boto.create_bucket(Bucket=bucket_name) + # GIVEN: survey packets for 3 days exist in the bucket + s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/09/03/mattermost/user-survey.zip') + s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/mattermost/user-survey.zip') + s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/example/user-survey.zip') + + # WHEN: request to ingest all + + ingest_surveys_from_s3(conn, 'test_schema', bucket_name, 'prefix') + + # THEN: expect the data to be in the database + df = pd.read_sql("SELECT * FROM 'test_schema'.user_survey ORDER BY source", conn) + + assert len(df) == 9 + assert ( + df['source'].tolist() + == ['s3://bucket-with-files/prefix/2024/09/03/mattermost/user-survey.zip'] * 3 + + ['s3://bucket-with-files/prefix/2024/10/02/example/user-survey.zip'] * 3 + + ['s3://bucket-with-files/prefix/2024/10/02/mattermost/user-survey.zip'] * 3 + ) + + +@mock_aws +@pytest.mark.freeze_time +def test_ingest_survey_from_s3_resume(s3_boto, sqlalchemy_memory_engine, freezer): + with sqlalchemy_memory_engine.connect() as conn: + # GIVEN: database setup has been completed + conn.execute("ATTACH DATABASE ':memory:' AS 'test_schema'") + + # GIVEN: an S3 bucket + bucket_name = 'bucket-with-files' + s3_boto.create_bucket(Bucket=bucket_name) + # GIVEN: survey packets for 3 days exist in the bucket + s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/09/03/mattermost/user-survey.zip') + s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/mattermost/user-survey.zip') + s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/example/user-survey.zip') + + # GIVEN: first file was ingested in the past + freezer.move_to('2024-09-10') + ingest_surveys_from_s3(conn, 'test_schema', bucket_name, 'prefix') + freezer.move_to('2024-10-02') + + # WHEN: request to ingest + ingest_surveys_from_s3(conn, 'test_schema', bucket_name, 'prefix') + + # THEN: expect the data to be in the database + df = pd.read_sql("SELECT * FROM 'test_schema'.user_survey ORDER BY source", conn) + + assert len(df) == 9 + assert ( + df['source'].tolist() + == ['s3://bucket-with-files/prefix/2024/09/03/mattermost/user-survey.zip'] * 3 + + ['s3://bucket-with-files/prefix/2024/10/02/example/user-survey.zip'] * 3 + + ['s3://bucket-with-files/prefix/2024/10/02/mattermost/user-survey.zip'] * 3 + ) + assert df['ingestion_date'].tolist() == ['2024-09-10'] * 3 + ['2024-10-02'] * 6 + + +def test_ingest_survey_from_s3_unknown_bucket(s3_boto, sqlalchemy_memory_engine): + with sqlalchemy_memory_engine.connect() as conn: + # GIVEN: database setup has been completed + conn.execute("ATTACH DATABASE ':memory:' AS 'test_schema'") + + # WHEN: request to ingest from an unkwnown bucket + with pytest.raises(ValueError) as e: + ingest_surveys_from_s3(conn, 'test_schema', "not-exists", 'prefix') + + # THEN: expect an error + assert str(e.value) == 'Bucket not-exists does not exist' diff --git a/utils/packets/__main__.py b/utils/packets/__main__.py index eb3941281..a723e2d7d 100644 --- a/utils/packets/__main__.py +++ b/utils/packets/__main__.py @@ -5,7 +5,12 @@ from utils.db.helpers import snowflake_engine from utils.helpers import initialize_cli_logging -from utils.packets.service import ingest_support_packet, ingest_survey_packet +from utils.packets.service import ( + ingest_support_packet, + ingest_support_packets_from_s3, + ingest_survey_packet, + ingest_surveys_from_s3, +) initialize_cli_logging(logging.INFO, 'stderr') @@ -77,6 +82,20 @@ def user_survey( ingest_survey_packet(conn, ctx.parent.params['schema'], input, click.format_filename(input)) +@packets.command() +@click.argument('bucket', type=str) +@click.argument('prefix', type=str) +@click.pass_context +def user_survey_s3(ctx: click.Context, bucket: str, prefix: str) -> None: + """ + Ingest user survey packets from S3. + :param bucket: The S3 bucket to ingest from/ + :param prefix: The prefix to search for support packages. + """ + with ctx.obj['engine'].begin() as conn: + ingest_surveys_from_s3(conn, ctx.parent.params['schema'], bucket, prefix) + + @packets.command() @click.argument('input', type=click.Path(exists=True, dir_okay=False, readable=True, resolve_path=True)) @click.pass_context @@ -91,3 +110,17 @@ def support_v1( """ with ctx.obj['engine'].begin() as conn: ingest_support_packet(conn, ctx.parent.params['schema'], input, click.format_filename(input)) + + +@packets.command() +@click.argument('bucket', type=str) +@click.argument('prefix', type=str) +@click.pass_context +def support_v1_s3(ctx: click.Context, bucket: str, prefix: str) -> None: + """ + Ingest support packets from S3. + :param bucket: The S3 bucket to ingest from/ + :param prefix: The prefix to search for support packages. + """ + with ctx.obj['engine'].begin() as conn: + ingest_support_packets_from_s3(conn, ctx.parent.params['schema'], bucket, prefix) diff --git a/utils/packets/aws.py b/utils/packets/aws.py index 4336a3a75..88e882d02 100644 --- a/utils/packets/aws.py +++ b/utils/packets/aws.py @@ -33,5 +33,5 @@ def object_iter(bucket: str, prefix: str) -> Generator[Tuple[str, BytesIO], None # Iterate for result in paginator.paginate(Bucket=bucket, Prefix=prefix): # Download each file individually - for key in result['Contents']: + for key in result.get('Contents', []): yield key['Key'], BytesIO(s3.get_object(Bucket=bucket, Key=key['Key'])["Body"].read()) diff --git a/utils/packets/service.py b/utils/packets/service.py index 86be38e75..25ea732c3 100644 --- a/utils/packets/service.py +++ b/utils/packets/service.py @@ -1,14 +1,21 @@ import os +from datetime import datetime, timedelta from enum import Enum from logging import getLogger +from typing import Callable import pandas as pd from click import ClickException from sqlalchemy.engine import Connection +from sqlalchemy.exc import OperationalError from utils.db.helpers import upsert_dataframe_to_table +from utils.helpers import daterange +from utils.packets.aws import bucket_exists, object_iter from utils.packets.loaders import UserSurveyFixedColumns, load_support_packet_file, load_user_survey_package +DATE_FORMAT = '%Y-%m-%d' + logger = getLogger(__name__) @@ -48,6 +55,7 @@ def ingest_survey_packet(conn: Connection, target_schema: str, survey_packet: st enriched_df['metadata_extras_plugin_id'] = metadata.extras.plugin_id enriched_df['metadata_extras_plugin_version'] = metadata.extras.plugin_version enriched_df['source'] = source_uri + enriched_df['ingestion_date'] = datetime.today().strftime(DATE_FORMAT) # Upsert data upsert_dataframe_to_table( @@ -105,11 +113,13 @@ def ingest_support_packet(conn: Connection, target_schema: str, support_packet: sp_df['metadata_license_id'] = metadata.license_id if metadata else None sp_df['metadata_customer_id'] = metadata.customer_id if metadata else None sp_df['source'] = source_uri + sp_df['ingestion_date'] = datetime.today().strftime(DATE_FORMAT) job_df = pd.DataFrame([job.model_dump() for job in sp.all_jobs()]) job_df['data'] = job_df['data'].astype(str) job_df['metadata_server_id'] = server_id job_df['source'] = source_uri + sp_df['ingestion_date'] = datetime.today().strftime(DATE_FORMAT) # Upsert data upsert_dataframe_to_table(conn, target_schema, TableNames.SUPPORT_PACKET_V1.value, sp_df, server_id, source_uri) @@ -119,3 +129,54 @@ def ingest_support_packet(conn: Connection, target_schema: str, support_packet: except ValueError as e: raise ClickException(f'Error loading support package: {e}') + + +def _ingest_packet_from_s3( + conn: Connection, target_schema: str, bucket: str, prefix: str, ingest_func: Callable, table: TableNames +): + if not bucket_exists(bucket): + raise ValueError(f'Bucket {bucket} does not exist') + + try: + checkpoint = conn.execute( + f'SELECT MAX(ingestion_date) as checkpoint FROM \'{target_schema}\'.{table.value}' + ).scalar() + except OperationalError: + # Table does not exist + checkpoint = None + if checkpoint: + logger.info(f'Resuming ingestion from: {checkpoint}') + start_date = datetime.strptime(checkpoint, DATE_FORMAT) + for date in daterange(start_date, datetime.today() + timedelta(days=1)): + for key, content in object_iter(bucket, f'{prefix}/{date.strftime("%Y/%m/%d")}'): + logger.info(f'Ingesting {key}') + ingest_func(conn, target_schema, content, f's3://{bucket}/{key}') + else: + logger.info('Starting ingestion from the beginning') + for key, content in object_iter(bucket, f'{prefix}/'): + logger.info(f'Ingesting {key}') + ingest_func(conn, target_schema, content, f's3://{bucket}/{key}') + + +def ingest_surveys_from_s3(conn: Connection, target_schema: str, bucket: str, prefix: str): + """ + Load all survey packages from S3 + + :conn: The SQLAlchemy connection to use for ingesting the data. + :target_schema: The schema to ingest the data into. + :bucket: The S3 bucket name. + :prefix: The S3 bucket prefix where the survey packets are located at. + """ + _ingest_packet_from_s3(conn, target_schema, bucket, prefix, ingest_survey_packet, TableNames.USER_SURVEY) + + +def ingest_support_packets_from_s3(conn: Connection, target_schema: str, bucket: str, prefix: str): + """ + Load all support packets from S3 + + :conn: The SQLAlchemy connection to use for ingesting the data. + :target_schema: The schema to ingest the data into. + :bucket: The S3 bucket name. + :prefix: The S3 bucket prefix where the survey packets are located at. + """ + _ingest_packet_from_s3(conn, target_schema, bucket, prefix, ingest_support_packet, TableNames.SUPPORT_PACKET_V1)