Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload audit to s3 after setting all args #246

Merged
merged 1 commit into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Allow classes to use self-referencing Type hints in Python 3.7.
from __future__ import annotations
from typing import Optional
import pyarrow as pa
import logging
from deltacat import logs
Expand All @@ -21,10 +22,18 @@ class CompactionSessionAuditInfo(dict):
HASH_BUCKET_STEP_NAME = "hashBucket"
MERGE_STEP_NAME = "merge"

def __init__(self, deltacat_version: str, ray_version: str, audit_url: str):
def __init__(
self,
deltacat_version: Optional[str] = None,
ray_version: Optional[str] = None,
audit_url: Optional[str] = None,
**kwargs,
):
self.set_deltacat_version(deltacat_version)
self.set_ray_version(ray_version)
self.set_audit_url(audit_url)
if kwargs:
self.update(kwargs)

@property
def audit_url(self) -> str:
Expand Down
7 changes: 2 additions & 5 deletions deltacat/compute/compactor/model/round_completion_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
from typing import Tuple
from deltacat.storage import DeltaLocator, PartitionLocator
from deltacat.compute.compactor.model.pyarrow_write_result import PyArrowWriteResult
from deltacat.compute.compactor.model.compaction_session_audit_info import (
CompactionSessionAuditInfo,
)
from typing import Any, Dict, Optional


Expand Down Expand Up @@ -98,8 +95,8 @@ def sort_keys_bit_width(self) -> int:
return self["sortKeysBitWidth"]

@property
def compaction_audit(self) -> Optional[CompactionSessionAuditInfo]:
return self.get("compactionAudit")
def compaction_audit_url(self) -> Optional[str]:
return self.get("compactionAuditUrl")

@property
def rebase_source_partition_locator(self) -> Optional[PartitionLocator]:
Expand Down
12 changes: 6 additions & 6 deletions deltacat/compute/compactor_v2/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,6 @@ def merge_input_provider(index, item):
)
compaction_audit.set_cluster_cpu_max(cluster_util.max_cpu)

s3_utils.upload(
compaction_audit.audit_url,
str(json.dumps(compaction_audit)),
**params.s3_client_kwargs,
)

input_inflation = None
input_average_record_size_bytes = None
# Note: we only consider inflation for incremental delta
Expand Down Expand Up @@ -537,6 +531,12 @@ def merge_input_provider(index, item):
+ round_completion_info.compacted_pyarrow_write_result.records
)

s3_utils.upload(
compaction_audit.audit_url,
str(json.dumps(compaction_audit)),
**params.s3_client_kwargs,
)

new_round_completion_info = RoundCompletionInfo.of(
high_watermark=params.last_stream_position_to_compact,
compacted_delta_locator=new_compacted_delta_locator,
Expand Down
26 changes: 23 additions & 3 deletions deltacat/tests/compute/test_compact_partition_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from boto3.resources.base import ServiceResource
import pyarrow as pa
from deltacat.tests.compute.test_util_common import (
get_compacted_delta_locator_from_rcf,
get_rcf,
)
from deltacat.tests.test_utils.utils import read_s3_contents
from deltacat.tests.compute.test_util_create_table_deltas_repo import (
create_src_w_deltas_destination_plus_destination,
)
Expand Down Expand Up @@ -167,6 +168,9 @@ def test_compact_partition_incremental(
DeltaLocator,
PartitionLocator,
)
from deltacat.compute.compactor.model.compaction_session_audit_info import (
CompactionSessionAuditInfo,
)
from deltacat.compute.compactor.model.compact_partition_params import (
CompactPartitionParams,
)
Expand Down Expand Up @@ -234,9 +238,20 @@ def test_compact_partition_incremental(
# execute
rcf_file_s3_uri = compact_partition_func(compact_partition_params)
# validate
compacted_delta_locator: DeltaLocator = get_compacted_delta_locator_from_rcf(
setup_s3_resource, rcf_file_s3_uri
round_completion_info = get_rcf(setup_s3_resource, rcf_file_s3_uri)
compacted_delta_locator: DeltaLocator = (
round_completion_info.compacted_delta_locator
)
audit_bucket, audit_key = round_completion_info.compaction_audit_url.replace(
"s3://", ""
).split("/", 1)
compaction_audit_obj: dict = read_s3_contents(
setup_s3_resource, audit_bucket, audit_key
)
compaction_audit: CompactionSessionAuditInfo = CompactionSessionAuditInfo(
**compaction_audit_obj
)

tables = ds.download_delta(compacted_delta_locator, **ds_mock_kwargs)
actual_compacted_table = pa.concat_tables(tables)
sorting_cols: List[Any] = [(val, "ascending") for val in primary_keys]
Expand All @@ -250,6 +265,11 @@ def test_compact_partition_incremental(
actual_compacted_table = actual_compacted_table.combine_chunks().sort_by(
sorting_cols
)

assert compaction_audit.input_records == len(
input_deltas
), "The input_records must be equal to total records in the input"

assert actual_compacted_table.equals(
expected_terminal_compact_partition_result
), f"{actual_compacted_table} does not match {expected_terminal_compact_partition_result}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
DEFAULT_NUM_WORKERS,
DEFAULT_WORKER_INSTANCE_CPUS,
)
from deltacat.tests.compute.test_util_common import (
get_rcf,
)
from deltacat.tests.test_utils.utils import read_s3_contents
from deltacat.tests.compute.test_util_common import (
get_compacted_delta_locator_from_rcf,
)
Expand Down Expand Up @@ -192,6 +196,9 @@ def test_compact_partition_rebase_then_incremental(
from deltacat.utils.placement import (
PlacementGroupManager,
)
from deltacat.compute.compactor.model.compaction_session_audit_info import (
CompactionSessionAuditInfo,
)

ds_mock_kwargs = offer_local_deltacat_storage_kwargs
ray.shutdown()
Expand Down Expand Up @@ -317,9 +324,20 @@ def test_compact_partition_rebase_then_incremental(
}
)
rcf_file_s3_uri = compact_partition_func(compact_partition_params)
round_completion_info = get_rcf(setup_s3_resource, rcf_file_s3_uri)
compacted_delta_locator_incremental: DeltaLocator = (
get_compacted_delta_locator_from_rcf(setup_s3_resource, rcf_file_s3_uri)
round_completion_info.compacted_delta_locator
)
audit_bucket, audit_key = round_completion_info.compaction_audit_url.replace(
"s3://", ""
).split("/", 1)
compaction_audit_obj: dict = read_s3_contents(
setup_s3_resource, audit_bucket, audit_key
)
compaction_audit: CompactionSessionAuditInfo = CompactionSessionAuditInfo(
**compaction_audit_obj
)

tables = ds.download_delta(compacted_delta_locator_incremental, **ds_mock_kwargs)
actual_compacted_table = pa.concat_tables(tables)
expected_terminal_compact_partition_result = (
Expand All @@ -330,6 +348,14 @@ def test_compact_partition_rebase_then_incremental(
actual_compacted_table = actual_compacted_table.combine_chunks().sort_by(
sorting_cols
)

assert compaction_audit.input_records == (
len(incremental_deltas) if incremental_deltas else 0
) + len(actual_rebase_compacted_table), (
"Total input records must be equal to incremental deltas"
"+ previous compacted table size"
)

assert actual_compacted_table.equals(
expected_terminal_compact_partition_result
), f"{actual_compacted_table} does not match {expected_terminal_compact_partition_result}"
Expand Down
19 changes: 14 additions & 5 deletions deltacat/tests/compute/test_util_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,29 @@ def create_rebase_table(
)


def get_compacted_delta_locator_from_rcf(
s3_resource: ServiceResource, rcf_file_s3_uri: str
):
def get_rcf(s3_resource, rcf_file_s3_uri: str):
from deltacat.tests.test_utils.utils import read_s3_contents
from deltacat.compute.compactor import (
RoundCompletionInfo,
)
from deltacat.storage import DeltaLocator

_, rcf_object_key = rcf_file_s3_uri.rsplit("/", 1)
rcf_file_output: Dict[str, Any] = read_s3_contents(
s3_resource, TEST_S3_RCF_BUCKET_NAME, rcf_object_key
)
round_completion_info: RoundCompletionInfo = RoundCompletionInfo(**rcf_file_output)
return RoundCompletionInfo(**rcf_file_output)


def get_compacted_delta_locator_from_rcf(
s3_resource: ServiceResource, rcf_file_s3_uri: str
):
from deltacat.storage import DeltaLocator
from deltacat.compute.compactor import (
RoundCompletionInfo,
)

round_completion_info: RoundCompletionInfo = get_rcf(s3_resource, rcf_file_s3_uri)

compacted_delta_locator: DeltaLocator = (
round_completion_info.compacted_delta_locator
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def create_src_w_deltas_destination_plus_destination(
ds_mock_kwargs: Optional[Dict[str, Any]],
) -> Tuple[Stream, Stream, Optional[Stream]]:
import deltacat.tests.local_deltacat_storage as ds
from deltacat.storage import Partition, Stream

source_namespace, source_table_name, source_table_version = create_src_table(
primary_keys, sort_keys, partition_keys, ds_mock_kwargs
Expand Down
Loading