Skip to content

Commit

Permalink
add sliced string cast for hash column generator
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Nov 21, 2023
1 parent 37a4865 commit 16edc98
Showing 1 changed file with 35 additions and 1 deletion.
36 changes: 35 additions & 1 deletion deltacat/compute/compactor_v2/utils/primary_key_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,40 @@ def group_by_pk_hash_bucket(
return result


def _sliced_string_cast(array: pa.ChunkedArray) -> pa.ChunkedArray:
"""performs slicing of a pyarrow array prior casting to a string.
This prevents a pyarrow from allocating too large of an array causing a failure.
"""
dtype = array.type
MAX_BYTES = 2147483646
max_str_len = None
if pa.types.is_integer(dtype):
max_str_len = 21 # -INT_MAX
elif pa.types.is_floating(dtype):
max_str_len = 24
elif pa.types.is_decimal128(dtype):
max_str_len = 39
elif pa.types.is_decimal256(dtype):
max_str_len = 77

if max_str_len is not None:
max_elems_per_chunk = MAX_BYTES // (2 * max_str_len) # safety factor of 2
all_chunks = []
for chunk in array.chunks:
if len(chunk) < max_elems_per_chunk:
all_chunks.append(chunk)
else:
curr_pos = 0
total_len = len(chunk)
while curr_pos < total_len:
sliced = chunk.slice(curr_pos, max_elems_per_chunk)
curr_pos += len(sliced)
all_chunks.append(sliced)
array = pa.chunked_array(all_chunks, type=dtype)

return pc.cast(array, pa.string())


def generate_pk_hash_column(
tables: List[pa.Table],
primary_keys: Optional[List[str]] = None,
Expand All @@ -182,7 +216,7 @@ def generate_pk_hash_column(
def _generate_pk_hash(table: pa.Table) -> pa.Array:
pk_columns = []
for pk_name in primary_keys:
pk_columns.append(pc.cast(table[pk_name], pa.string()))
pk_columns.append(_sliced_string_cast(table[pk_name]))

pk_columns.append(PK_DELIMITER)
hash_column = pc.binary_join_element_wise(*pk_columns)
Expand Down

0 comments on commit 16edc98

Please sign in to comment.