Skip to content

Commit

Permalink
Add loading from S3
Browse files Browse the repository at this point in the history
  • Loading branch information
ifoukarakis committed Oct 9, 2024
1 parent 919afae commit 5ec3a17
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 3 deletions.
83 changes: 82 additions & 1 deletion tests/utils/packets/test_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import pandas as pd
import pytest
from moto import mock_aws
from pandas._testing import assert_frame_equal

from tests.utils.packets import SUPPORT_DIR, SURVEY_DIR
from utils.packets.service import ingest_support_packet, ingest_survey_packet
from utils.packets.service import ingest_support_packet, ingest_survey_packet, ingest_surveys_from_s3


def test_ingest_survey_packet(sqlalchemy_memory_engine):
Expand Down Expand Up @@ -202,3 +204,82 @@ def test_ingest_support_packet_twice(sqlalchemy_memory_engine):
)
assert result['metadata_server_id'] == 'rmg9ib5rspy93jxswyc454bwzo'
assert result['source'] == 's3://bucket/valid_with_metadata.zip'


@mock_aws
def test_ingest_survey_from_s3_fresh(s3_boto, sqlalchemy_memory_engine):
with sqlalchemy_memory_engine.connect() as conn:
# GIVEN: database setup has been completed
conn.execute("ATTACH DATABASE ':memory:' AS 'test_schema'")

# GIVEN: an S3 bucket
bucket_name = 'bucket-with-files'
s3_boto.create_bucket(Bucket=bucket_name)
# GIVEN: survey packets for 3 days exist in the bucket
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/09/03/mattermost/user-survey.zip')
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/mattermost/user-survey.zip')
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/example/user-survey.zip')

# WHEN: request to ingest all

ingest_surveys_from_s3(conn, 'test_schema', bucket_name, 'prefix')

# THEN: expect the data to be in the database
df = pd.read_sql("SELECT * FROM 'test_schema'.user_survey ORDER BY source", conn)

assert len(df) == 9
assert (
df['source'].tolist()
== ['s3://bucket-with-files/prefix/2024/09/03/mattermost/user-survey.zip'] * 3
+ ['s3://bucket-with-files/prefix/2024/10/02/example/user-survey.zip'] * 3
+ ['s3://bucket-with-files/prefix/2024/10/02/mattermost/user-survey.zip'] * 3
)


@mock_aws
@pytest.mark.freeze_time
def test_ingest_survey_from_s3_resume(s3_boto, sqlalchemy_memory_engine, freezer):
with sqlalchemy_memory_engine.connect() as conn:
# GIVEN: database setup has been completed
conn.execute("ATTACH DATABASE ':memory:' AS 'test_schema'")

# GIVEN: an S3 bucket
bucket_name = 'bucket-with-files'
s3_boto.create_bucket(Bucket=bucket_name)
# GIVEN: survey packets for 3 days exist in the bucket
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/09/03/mattermost/user-survey.zip')
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/mattermost/user-survey.zip')
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/example/user-survey.zip')

# GIVEN: first file was ingested in the past
freezer.move_to('2024-09-10')
ingest_surveys_from_s3(conn, 'test_schema', bucket_name, 'prefix')
freezer.move_to('2024-10-02')

# WHEN: request to ingest
ingest_surveys_from_s3(conn, 'test_schema', bucket_name, 'prefix')

# THEN: expect the data to be in the database
df = pd.read_sql("SELECT * FROM 'test_schema'.user_survey ORDER BY source", conn)

assert len(df) == 9
assert (
df['source'].tolist()
== ['s3://bucket-with-files/prefix/2024/09/03/mattermost/user-survey.zip'] * 3
+ ['s3://bucket-with-files/prefix/2024/10/02/example/user-survey.zip'] * 3
+ ['s3://bucket-with-files/prefix/2024/10/02/mattermost/user-survey.zip'] * 3
)
assert df['ingestion_date'].tolist() == ['2024-09-10'] * 3 + ['2024-10-02'] * 6


def test_ingest_survey_from_s3_unknown_bucket(s3_boto, sqlalchemy_memory_engine):
with sqlalchemy_memory_engine.connect() as conn:
# GIVEN: database setup has been completed
conn.execute("ATTACH DATABASE ':memory:' AS 'test_schema'")

# WHEN: request to ingest from an unkwnown bucket
with pytest.raises(ValueError) as e:
ingest_surveys_from_s3(conn, 'test_schema', "not-exists", 'prefix')

# THEN: expect an error
assert str(e.value) == 'Bucket not-exists does not exist'
35 changes: 34 additions & 1 deletion utils/packets/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

from utils.db.helpers import snowflake_engine
from utils.helpers import initialize_cli_logging
from utils.packets.service import ingest_support_packet, ingest_survey_packet
from utils.packets.service import (
ingest_support_packet,
ingest_support_packets_from_s3,
ingest_survey_packet,
ingest_surveys_from_s3,
)

initialize_cli_logging(logging.INFO, 'stderr')

Expand Down Expand Up @@ -77,6 +82,20 @@ def user_survey(
ingest_survey_packet(conn, ctx.parent.params['schema'], input, click.format_filename(input))


@packets.command()
@click.argument('bucket', type=str)
@click.argument('prefix', type=str)
@click.pass_context
def user_survey_s3(ctx: click.Context, bucket: str, prefix: str) -> None:
"""
Ingest user survey packets from S3.
:param bucket: The S3 bucket to ingest from/
:param prefix: The prefix to search for support packages.
"""
with ctx.obj['engine'].begin() as conn:
ingest_surveys_from_s3(conn, ctx.parent.params['schema'], bucket, prefix)


@packets.command()
@click.argument('input', type=click.Path(exists=True, dir_okay=False, readable=True, resolve_path=True))
@click.pass_context
Expand All @@ -91,3 +110,17 @@ def support_v1(
"""
with ctx.obj['engine'].begin() as conn:
ingest_support_packet(conn, ctx.parent.params['schema'], input, click.format_filename(input))


@packets.command()
@click.argument('bucket', type=str)
@click.argument('prefix', type=str)
@click.pass_context
def support_v1_s3(ctx: click.Context, bucket: str, prefix: str) -> None:
"""
Ingest support packets from S3.
:param bucket: The S3 bucket to ingest from/
:param prefix: The prefix to search for support packages.
"""
with ctx.obj['engine'].begin() as conn:
ingest_support_packets_from_s3(conn, ctx.parent.params['schema'], bucket, prefix)
2 changes: 1 addition & 1 deletion utils/packets/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ def object_iter(bucket: str, prefix: str) -> Generator[Tuple[str, BytesIO], None
# Iterate
for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
# Download each file individually
for key in result['Contents']:
for key in result.get('Contents', []):
yield key['Key'], BytesIO(s3.get_object(Bucket=bucket, Key=key['Key'])["Body"].read())
61 changes: 61 additions & 0 deletions utils/packets/service.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import os
from datetime import datetime, timedelta
from enum import Enum
from logging import getLogger
from typing import Callable

import pandas as pd
from click import ClickException
from sqlalchemy.engine import Connection
from sqlalchemy.exc import OperationalError

from utils.db.helpers import upsert_dataframe_to_table
from utils.helpers import daterange
from utils.packets.aws import bucket_exists, object_iter
from utils.packets.loaders import UserSurveyFixedColumns, load_support_packet_file, load_user_survey_package

DATE_FORMAT = '%Y-%m-%d'

logger = getLogger(__name__)


Expand Down Expand Up @@ -48,6 +55,7 @@ def ingest_survey_packet(conn: Connection, target_schema: str, survey_packet: st
enriched_df['metadata_extras_plugin_id'] = metadata.extras.plugin_id
enriched_df['metadata_extras_plugin_version'] = metadata.extras.plugin_version
enriched_df['source'] = source_uri
enriched_df['ingestion_date'] = datetime.today().strftime(DATE_FORMAT)

# Upsert data
upsert_dataframe_to_table(
Expand Down Expand Up @@ -105,11 +113,13 @@ def ingest_support_packet(conn: Connection, target_schema: str, support_packet:
sp_df['metadata_license_id'] = metadata.license_id if metadata else None
sp_df['metadata_customer_id'] = metadata.customer_id if metadata else None
sp_df['source'] = source_uri
sp_df['ingestion_date'] = datetime.today().strftime(DATE_FORMAT)

job_df = pd.DataFrame([job.model_dump() for job in sp.all_jobs()])
job_df['data'] = job_df['data'].astype(str)
job_df['metadata_server_id'] = server_id
job_df['source'] = source_uri
sp_df['ingestion_date'] = datetime.today().strftime(DATE_FORMAT)

# Upsert data
upsert_dataframe_to_table(conn, target_schema, TableNames.SUPPORT_PACKET_V1.value, sp_df, server_id, source_uri)
Expand All @@ -119,3 +129,54 @@ def ingest_support_packet(conn: Connection, target_schema: str, support_packet:

except ValueError as e:
raise ClickException(f'Error loading support package: {e}')


def _ingest_packet_from_s3(
conn: Connection, target_schema: str, bucket: str, prefix: str, ingest_func: Callable, table: TableNames
):
if not bucket_exists(bucket):
raise ValueError(f'Bucket {bucket} does not exist')

try:
checkpoint = conn.execute(
f'SELECT MAX(ingestion_date) as checkpoint FROM \'{target_schema}\'.{table.value}'
).scalar()
except OperationalError:
# Table does not exist
checkpoint = None
if checkpoint:
logger.info(f'Resuming ingestion from: {checkpoint}')
start_date = datetime.strptime(checkpoint, DATE_FORMAT)
for date in daterange(start_date, datetime.today() + timedelta(days=1)):
for key, content in object_iter(bucket, f'{prefix}/{date.strftime("%Y/%m/%d")}'):
logger.info(f'Ingesting {key}')
ingest_func(conn, target_schema, content, f's3://{bucket}/{key}')
else:
logger.info('Starting ingestion from the beginning')
for key, content in object_iter(bucket, f'{prefix}/'):
logger.info(f'Ingesting {key}')
ingest_func(conn, target_schema, content, f's3://{bucket}/{key}')


def ingest_surveys_from_s3(conn: Connection, target_schema: str, bucket: str, prefix: str):
"""
Load all survey packages from S3
:conn: The SQLAlchemy connection to use for ingesting the data.
:target_schema: The schema to ingest the data into.
:bucket: The S3 bucket name.
:prefix: The S3 bucket prefix where the survey packets are located at.
"""
_ingest_packet_from_s3(conn, target_schema, bucket, prefix, ingest_survey_packet, TableNames.USER_SURVEY)


def ingest_support_packets_from_s3(conn: Connection, target_schema: str, bucket: str, prefix: str):
"""
Load all support packets from S3
:conn: The SQLAlchemy connection to use for ingesting the data.
:target_schema: The schema to ingest the data into.
:bucket: The S3 bucket name.
:prefix: The S3 bucket prefix where the survey packets are located at.
"""
_ingest_packet_from_s3(conn, target_schema, bucket, prefix, ingest_support_packet, TableNames.SUPPORT_PACKET_V1)

0 comments on commit 5ec3a17

Please sign in to comment.