Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: partition files on write #9

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pyfarmhash = "^0.3.2"
numpy = "^1.24.2"
base58 = "^2.1.1"
petname = "^2.6"
flexdict = "^0.0.1a1"

[tool.poetry.group.dev.dependencies]
pytest = "^7.3.0"
Expand Down
12 changes: 12 additions & 0 deletions src/singerlake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
from pydantic import BaseModel


class Partition(BaseModel):
"""Partition Model."""

by: t.Literal["year", "month", "day", "hour", "minute", "second"]


class GenericPathModel(BaseModel):
"""Generic Path Model."""

Expand All @@ -15,6 +21,12 @@ class PathConfig(BaseModel):

path_type: str = "hive"
lake_root: GenericPathModel
partition_by: t.Optional[t.List[Partition]] = [
Partition(by="year"),
Partition(by="month"),
Partition(by="day"),
Partition(by="hour"),
]


class LockConfig(BaseModel):
Expand Down
12 changes: 12 additions & 0 deletions src/singerlake/singer/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from datetime import datetime


def get_time_extracted(record: dict) -> datetime:
"""Return the time extracted from a record."""
time_extracted = record.get("time_extracted") or record.get("record", {}).get(
"_sdc_extracted_at"
)
if not time_extracted:
raise ValueError("Record does not contain time_extracted")

return datetime.fromisoformat(time_extracted)
33 changes: 16 additions & 17 deletions src/singerlake/stream/file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from pathlib import Path
from uuid import uuid4

import singerlake.singer.utils as su

if t.TYPE_CHECKING:
from .stream import Stream

Expand All @@ -18,6 +20,7 @@ class SingerFileWriter:

def __init__(self, stream: "Stream") -> None:
self.stream = stream
self.records_written = 0

self._tmp_dir: Path | None = None
self._file: TextIOWrapper | None = None
Expand Down Expand Up @@ -64,22 +67,6 @@ def tmp_dir(self, value: Path) -> None:
"""Set the temporary directory."""
self._tmp_dir = value

def _get_time_extracted(self, record: dict) -> datetime:
"""Return the time extracted from a record."""
time_extracted = record.get("time_extracted") or record.get("record", {}).get(
"_sdc_extracted_at"
)
if not time_extracted:
raise ValueError("Record does not contain time_extracted")

return datetime.fromisoformat(time_extracted)

def _open_file(self, tmp_dir: Path) -> TextIOWrapper:
"""Open a file for writing."""
self.file_path = tmp_dir / f"{uuid4()}.jsonl"
self.file = self.file_path.open("w", encoding="utf-8")
return self.file

@property
def file_name(self) -> str:
"""Return the file name."""
Expand All @@ -90,6 +77,17 @@ def file_name(self) -> str:
file_stop_time = self._max_time_extracted.strftime("%Y%m%dT%H%M%SZ")
return f"{self.stream.stream_id}-{file_start_time}-{file_stop_time}.singer"

@property
def closed(self) -> bool:
"""Return True if the file is closed."""
return self._file is None

def _open_file(self, tmp_dir: Path) -> TextIOWrapper:
"""Open a file for writing."""
self.file_path = tmp_dir / f"{uuid4()}.jsonl"
self.file = self.file_path.open("w", encoding="utf-8")
return self.file

def open(self) -> SingerFileWriter:
"""Create a temporary directory and new file to write records to."""
if self._tmp_dir is None:
Expand Down Expand Up @@ -119,7 +117,7 @@ def write_record(self, record: dict) -> None:
if self._file is None:
raise ValueError("File not open")

time_extracted = self._get_time_extracted(record)
time_extracted = su.get_time_extracted(record)

if self._min_time_extracted is None:
self._min_time_extracted = time_extracted
Expand All @@ -135,6 +133,7 @@ def write_record(self, record: dict) -> None:

payload = json.dumps(record, ensure_ascii=False)
self.file.write(f"{payload}\n")
self.records_written += 1

def write_schema(self, schema: dict) -> None:
"""Write a schema to the file."""
Expand Down
71 changes: 34 additions & 37 deletions src/singerlake/stream/record_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import typing as t
from pathlib import Path

from flexdict import FlexDict

import singerlake.singer.utils as su

from .file_writer import SingerFileWriter

if t.TYPE_CHECKING:
Expand All @@ -20,50 +24,43 @@ def __init__(self, stream: Stream, output_dir: Path) -> None:
self.output_dir = output_dir

self.files: list[Path] = []
self._current_file: SingerFileWriter | None = None
self._record_count = 0

@property
def current_file(self) -> SingerFileWriter:
"""Return the current file."""
if self._current_file is None:
raise ValueError("File not open.")

return self._current_file

@current_file.setter
def current_file(self, value: SingerFileWriter) -> None:
"""Set the current file."""
self._current_file = value

def open(self) -> RecordWriter:
self.current_file = SingerFileWriter(stream=self.stream).open()
return self
self.is_finalized = False
self._open_files: FlexDict = FlexDict()

def close(self):
"""Finalize the last file."""
if self._current_file is None:
raise ValueError("File not open.")

self._finalize_current_file()

def _finalize_current_file(self):
finalized_file_path = self.current_file.close(output_dir=self.output_dir)
self._current_file = None
def _finalize_file(self, file: SingerFileWriter) -> None:
finalized_file_path = file.close(output_dir=self.output_dir)
self.files.append(finalized_file_path)

def _new_file(self, partition: t.Tuple[t.Any, ...]) -> SingerFileWriter:
"""Return a new file."""
file = SingerFileWriter(stream=self.stream).open()
self._open_files.set(keys=partition, value=file)
return file

def write(self, schema: dict, record: dict) -> None:
"""Write a record to the stream."""

if self._record_count == MAX_RECORD_COUNT:
self._finalize_current_file()
# partition the record
time_extracted = su.get_time_extracted(record)
partition = self.stream.partition_record(time_extracted) or ("default",)
file = self._open_files.get(partition)

if not file or file.closed:
file = self._new_file(partition)

if file.records_written == MAX_RECORD_COUNT:
self._finalize_file(file)
# open a new file
self.current_file = SingerFileWriter(stream=self.stream).open()
self._record_count = 0
file = self._new_file(partition)

if self._record_count == 0:
if file.records_written == 0:
# write the stream schema
self.current_file.write_schema(schema)
file.write_schema(schema)

file.write_record(record)

self.current_file.write_record(record)
self._record_count += 1
def finalize(self) -> None:
"""Finalize the stream."""
for file in self._open_files.values(nested=True):
self._finalize_file(file)
self.is_finalized = True
27 changes: 23 additions & 4 deletions src/singerlake/stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from .record_writer import RecordWriter

if t.TYPE_CHECKING:
from datetime import datetime
from pathlib import Path

from singerlake import Singerlake
from singerlake.config import Partition
from singerlake.tap import Tap


Expand All @@ -18,22 +20,39 @@ class Stream:
- committing files to storage
"""

def __init__(self, singerlake: "Singerlake", tap: "Tap", stream_id: str) -> None:
def __init__(
self,
singerlake: "Singerlake",
tap: "Tap",
stream_id: str,
) -> None:
self.singerlake = singerlake
self.tap = tap
self.stream_id = stream_id
self.partitions: t.List["Partition"] = (
self.singerlake.config.store.path.partition_by or []
)

self.files: list[Path] = []

def partition_record(self, time_extracted: "datetime") -> t.Tuple[str, ...]:
"""Partition a record."""
partitions = [
getattr(time_extracted, partition.by) for partition in self.partitions
]
return tuple(partitions)

@contextmanager
def record_writer(self):
"""Create a record writer for this stream."""
writer = RecordWriter(stream=self, output_dir=self.singerlake.working_dir)
writer = RecordWriter(
stream=self,
output_dir=self.singerlake.working_dir,
)
try:
writer.open()
yield writer
finally:
writer.close()
writer.finalize()
self.files.extend(writer.files)

def commit(self):
Expand Down
Loading
Loading