Skip to content

Commit

Permalink
MM-60769: load from s3 (#1629)
Browse files Browse the repository at this point in the history
* Add dependencies for S3 access & testing

* Add S3 utils

* Add loading from S3

* Remove left-over comment
  • Loading branch information
ifoukarakis authored Oct 15, 2024
1 parent 32706fd commit 7673294
Show file tree
Hide file tree
Showing 8 changed files with 575 additions and 30 deletions.
313 changes: 285 additions & 28 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ click = "^8.1.3"
tabulate = "^0.9.0"
pydantic = "^2.8.2"
pyyaml = "^6.0.2"
boto3 = "^1.35.35"

[tool.poetry.group.dev.dependencies]
pylint = "^2.15.4"
Expand All @@ -46,6 +47,7 @@ responses = "0.22.0"
mock = "^4.0.3"
pytest = "^7.1.3"
pytest-freezegun = "^0.4.2"
moto = {extras = ["s3"], version = "^5.0.16"}

[build-system]
requires = ["poetry-core"]
Expand Down
14 changes: 14 additions & 0 deletions tests/utils/packets/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os

import boto3
import pytest

# Safeguard for moto tests
os.environ['AWS_ACCESS_KEY_ID'] = 'test-key'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'test-secret'


@pytest.fixture
def s3_boto():
"""Create an S3 boto3 client and return the client object"""
return boto3.client('s3', region_name='us-east-1')
63 changes: 63 additions & 0 deletions tests/utils/packets/test_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from moto import mock_aws

from utils.packets.aws import bucket_exists, object_iter


@mock_aws
def test_bucket_exists(s3_boto):
# GIVEN: 2 buckets exist
s3_boto.create_bucket(Bucket='test-bucket-1')
s3_boto.create_bucket(Bucket='test-bucket-2')

# THEN: expect existing buckets to be found
assert bucket_exists('test-bucket-1') is True
assert bucket_exists('test-bucket-2') is True

# THEN: expect non-existing bucket to be not found
assert bucket_exists('test-bucket-1-1') is False
assert bucket_exists('not-exists') is False


@mock_aws
def test_object_iter_with_prefix(s3_boto):
# GIVEN: a bucket with 3 objects exists
bucket_name = 'bucket-with-files'
s3_boto.create_bucket(Bucket=bucket_name)
s3_boto.put_object(Bucket=bucket_name, Key='prefix/2024/09/05/Mattermost/1.txt', Body=b'1')
s3_boto.put_object(Bucket=bucket_name, Key='prefix/2024/10/04/Mattermost/1.txt', Body=b'2')
s3_boto.put_object(Bucket=bucket_name, Key='prefix/2024/10/04/Mattermost/2.txt', Body=b'3')

# WHEN: iterating over objects with prefix
objects = list(object_iter(bucket_name, 'prefix/2024/10'))

# THEN: expect two objects to have been loaded
assert len(objects) == 2
assert objects[0][0] == 'prefix/2024/10/04/Mattermost/1.txt'
assert objects[0][1].read() == b'2'

assert objects[1][0] == 'prefix/2024/10/04/Mattermost/2.txt'
assert objects[1][1].read() == b'3'


@mock_aws
def test_object_iter_without_prefix(s3_boto):
# GIVEN: a bucket with 3 objects exists
bucket_name = 'bucket-with-files'
s3_boto.create_bucket(Bucket=bucket_name)
s3_boto.put_object(Bucket=bucket_name, Key='prefix/2024/09/05/Mattermost/1.txt', Body=b'1')
s3_boto.put_object(Bucket=bucket_name, Key='prefix/2024/10/04/Mattermost/1.txt', Body=b'2')
s3_boto.put_object(Bucket=bucket_name, Key='prefix/2024/10/04/Mattermost/2.txt', Body=b'3')

# WHEN: iterating over objects with prefix
objects = list(object_iter(bucket_name, ''))

# THEN: expect two objects to have been loaded
assert len(objects) == 3
assert objects[0][0] == 'prefix/2024/09/05/Mattermost/1.txt'
assert objects[0][1].read() == b'1'

assert objects[1][0] == 'prefix/2024/10/04/Mattermost/1.txt'
assert objects[1][1].read() == b'2'

assert objects[2][0] == 'prefix/2024/10/04/Mattermost/2.txt'
assert objects[2][1].read() == b'3'
83 changes: 82 additions & 1 deletion tests/utils/packets/test_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import pandas as pd
import pytest
from moto import mock_aws
from pandas._testing import assert_frame_equal

from tests.utils.packets import SUPPORT_DIR, SURVEY_DIR
from utils.packets.service import ingest_support_packet, ingest_survey_packet
from utils.packets.service import ingest_support_packet, ingest_survey_packet, ingest_surveys_from_s3


def test_ingest_survey_packet(sqlalchemy_memory_engine):
Expand Down Expand Up @@ -202,3 +204,82 @@ def test_ingest_support_packet_twice(sqlalchemy_memory_engine):
)
assert result['metadata_server_id'] == 'rmg9ib5rspy93jxswyc454bwzo'
assert result['source'] == 's3://bucket/valid_with_metadata.zip'


@mock_aws
def test_ingest_survey_from_s3_fresh(s3_boto, sqlalchemy_memory_engine):
with sqlalchemy_memory_engine.connect() as conn:
# GIVEN: database setup has been completed
conn.execute("ATTACH DATABASE ':memory:' AS 'test_schema'")

# GIVEN: an S3 bucket
bucket_name = 'bucket-with-files'
s3_boto.create_bucket(Bucket=bucket_name)
# GIVEN: survey packets for 3 days exist in the bucket
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/09/03/mattermost/user-survey.zip')
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/mattermost/user-survey.zip')
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/example/user-survey.zip')

# WHEN: request to ingest all

ingest_surveys_from_s3(conn, 'test_schema', bucket_name, 'prefix')

# THEN: expect the data to be in the database
df = pd.read_sql("SELECT * FROM 'test_schema'.user_survey ORDER BY source", conn)

assert len(df) == 9
assert (
df['source'].tolist()
== ['s3://bucket-with-files/prefix/2024/09/03/mattermost/user-survey.zip'] * 3
+ ['s3://bucket-with-files/prefix/2024/10/02/example/user-survey.zip'] * 3
+ ['s3://bucket-with-files/prefix/2024/10/02/mattermost/user-survey.zip'] * 3
)


@mock_aws
@pytest.mark.freeze_time
def test_ingest_survey_from_s3_resume(s3_boto, sqlalchemy_memory_engine, freezer):
with sqlalchemy_memory_engine.connect() as conn:
# GIVEN: database setup has been completed
conn.execute("ATTACH DATABASE ':memory:' AS 'test_schema'")

# GIVEN: an S3 bucket
bucket_name = 'bucket-with-files'
s3_boto.create_bucket(Bucket=bucket_name)
# GIVEN: survey packets for 3 days exist in the bucket
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/09/03/mattermost/user-survey.zip')
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/mattermost/user-survey.zip')
s3_boto.upload_file(SURVEY_DIR / 'valid.zip', bucket_name, 'prefix/2024/10/02/example/user-survey.zip')

# GIVEN: first file was ingested in the past
freezer.move_to('2024-09-10')
ingest_surveys_from_s3(conn, 'test_schema', bucket_name, 'prefix')
freezer.move_to('2024-10-02')

# WHEN: request to ingest
ingest_surveys_from_s3(conn, 'test_schema', bucket_name, 'prefix')

# THEN: expect the data to be in the database
df = pd.read_sql("SELECT * FROM 'test_schema'.user_survey ORDER BY source", conn)

assert len(df) == 9
assert (
df['source'].tolist()
== ['s3://bucket-with-files/prefix/2024/09/03/mattermost/user-survey.zip'] * 3
+ ['s3://bucket-with-files/prefix/2024/10/02/example/user-survey.zip'] * 3
+ ['s3://bucket-with-files/prefix/2024/10/02/mattermost/user-survey.zip'] * 3
)
assert df['ingestion_date'].tolist() == ['2024-09-10'] * 3 + ['2024-10-02'] * 6


def test_ingest_survey_from_s3_unknown_bucket(s3_boto, sqlalchemy_memory_engine):
with sqlalchemy_memory_engine.connect() as conn:
# GIVEN: database setup has been completed
conn.execute("ATTACH DATABASE ':memory:' AS 'test_schema'")

# WHEN: request to ingest from an unkwnown bucket
with pytest.raises(ValueError) as e:
ingest_surveys_from_s3(conn, 'test_schema', "not-exists", 'prefix')

# THEN: expect an error
assert str(e.value) == 'Bucket not-exists does not exist'
35 changes: 34 additions & 1 deletion utils/packets/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

from utils.db.helpers import snowflake_engine
from utils.helpers import initialize_cli_logging
from utils.packets.service import ingest_support_packet, ingest_survey_packet
from utils.packets.service import (
ingest_support_packet,
ingest_support_packets_from_s3,
ingest_survey_packet,
ingest_surveys_from_s3,
)

initialize_cli_logging(logging.INFO, 'stderr')

Expand Down Expand Up @@ -77,6 +82,20 @@ def user_survey(
ingest_survey_packet(conn, ctx.parent.params['schema'], input, click.format_filename(input))


@packets.command()
@click.argument('bucket', type=str)
@click.argument('prefix', type=str)
@click.pass_context
def user_survey_s3(ctx: click.Context, bucket: str, prefix: str) -> None:
"""
Ingest user survey packets from S3.
:param bucket: The S3 bucket to ingest from/
:param prefix: The prefix to search for support packages.
"""
with ctx.obj['engine'].begin() as conn:
ingest_surveys_from_s3(conn, ctx.parent.params['schema'], bucket, prefix)


@packets.command()
@click.argument('input', type=click.Path(exists=True, dir_okay=False, readable=True, resolve_path=True))
@click.pass_context
Expand All @@ -91,3 +110,17 @@ def support_v1(
"""
with ctx.obj['engine'].begin() as conn:
ingest_support_packet(conn, ctx.parent.params['schema'], input, click.format_filename(input))


@packets.command()
@click.argument('bucket', type=str)
@click.argument('prefix', type=str)
@click.pass_context
def support_v1_s3(ctx: click.Context, bucket: str, prefix: str) -> None:
"""
Ingest support packets from S3.
:param bucket: The S3 bucket to ingest from/
:param prefix: The prefix to search for support packages.
"""
with ctx.obj['engine'].begin() as conn:
ingest_support_packets_from_s3(conn, ctx.parent.params['schema'], bucket, prefix)
34 changes: 34 additions & 0 deletions utils/packets/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from io import BytesIO
from typing import Generator, Tuple

import boto3
from botocore.exceptions import ClientError

s3 = boto3.client('s3')


def bucket_exists(bucket: str) -> bool:
"""
Check if a bucket exists.
"""
try:
s3.head_bucket(Bucket=bucket)
return True
except ClientError:
return False


def object_iter(bucket: str, prefix: str) -> Generator[Tuple[str, BytesIO], None, None]:
"""
Iterates over all items from a bucket with a given prefix, return .
:bucket: The bucket to search in.
:prefix: The prefix to search for.
"""
paginator = s3.get_paginator('list_objects_v2')

# Iterate
for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
# Download each file individually
for key in result.get('Contents', []):
yield key['Key'], BytesIO(s3.get_object(Bucket=bucket, Key=key['Key'])["Body"].read())
61 changes: 61 additions & 0 deletions utils/packets/service.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import os
from datetime import datetime, timedelta
from enum import Enum
from logging import getLogger
from typing import Callable

import pandas as pd
from click import ClickException
from sqlalchemy.engine import Connection
from sqlalchemy.exc import OperationalError

from utils.db.helpers import upsert_dataframe_to_table
from utils.helpers import daterange
from utils.packets.aws import bucket_exists, object_iter
from utils.packets.loaders import UserSurveyFixedColumns, load_support_packet_file, load_user_survey_package

DATE_FORMAT = '%Y-%m-%d'

logger = getLogger(__name__)


Expand Down Expand Up @@ -48,6 +55,7 @@ def ingest_survey_packet(conn: Connection, target_schema: str, survey_packet: st
enriched_df['metadata_extras_plugin_id'] = metadata.extras.plugin_id
enriched_df['metadata_extras_plugin_version'] = metadata.extras.plugin_version
enriched_df['source'] = source_uri
enriched_df['ingestion_date'] = datetime.today().strftime(DATE_FORMAT)

# Upsert data
upsert_dataframe_to_table(
Expand Down Expand Up @@ -105,11 +113,13 @@ def ingest_support_packet(conn: Connection, target_schema: str, support_packet:
sp_df['metadata_license_id'] = metadata.license_id if metadata else None
sp_df['metadata_customer_id'] = metadata.customer_id if metadata else None
sp_df['source'] = source_uri
sp_df['ingestion_date'] = datetime.today().strftime(DATE_FORMAT)

job_df = pd.DataFrame([job.model_dump() for job in sp.all_jobs()])
job_df['data'] = job_df['data'].astype(str)
job_df['metadata_server_id'] = server_id
job_df['source'] = source_uri
sp_df['ingestion_date'] = datetime.today().strftime(DATE_FORMAT)

# Upsert data
upsert_dataframe_to_table(conn, target_schema, TableNames.SUPPORT_PACKET_V1.value, sp_df, server_id, source_uri)
Expand All @@ -119,3 +129,54 @@ def ingest_support_packet(conn: Connection, target_schema: str, support_packet:

except ValueError as e:
raise ClickException(f'Error loading support package: {e}')


def _ingest_packet_from_s3(
conn: Connection, target_schema: str, bucket: str, prefix: str, ingest_func: Callable, table: TableNames
):
if not bucket_exists(bucket):
raise ValueError(f'Bucket {bucket} does not exist')

try:
checkpoint = conn.execute(
f'SELECT MAX(ingestion_date) as checkpoint FROM \'{target_schema}\'.{table.value}'
).scalar()
except OperationalError:
# Table does not exist
checkpoint = None
if checkpoint:
logger.info(f'Resuming ingestion from: {checkpoint}')
start_date = datetime.strptime(checkpoint, DATE_FORMAT)
for date in daterange(start_date, datetime.today() + timedelta(days=1)):
for key, content in object_iter(bucket, f'{prefix}/{date.strftime("%Y/%m/%d")}'):
logger.info(f'Ingesting {key}')
ingest_func(conn, target_schema, content, f's3://{bucket}/{key}')
else:
logger.info('Starting ingestion from the beginning')
for key, content in object_iter(bucket, f'{prefix}/'):
logger.info(f'Ingesting {key}')
ingest_func(conn, target_schema, content, f's3://{bucket}/{key}')


def ingest_surveys_from_s3(conn: Connection, target_schema: str, bucket: str, prefix: str):
"""
Load all survey packages from S3
:conn: The SQLAlchemy connection to use for ingesting the data.
:target_schema: The schema to ingest the data into.
:bucket: The S3 bucket name.
:prefix: The S3 bucket prefix where the survey packets are located at.
"""
_ingest_packet_from_s3(conn, target_schema, bucket, prefix, ingest_survey_packet, TableNames.USER_SURVEY)


def ingest_support_packets_from_s3(conn: Connection, target_schema: str, bucket: str, prefix: str):
"""
Load all support packets from S3
:conn: The SQLAlchemy connection to use for ingesting the data.
:target_schema: The schema to ingest the data into.
:bucket: The S3 bucket name.
:prefix: The S3 bucket prefix where the survey packets are located at.
"""
_ingest_packet_from_s3(conn, target_schema, bucket, prefix, ingest_support_packet, TableNames.SUPPORT_PACKET_V1)

0 comments on commit 7673294

Please sign in to comment.