diff --git a/src/singerlake/stream/file_writer.py b/src/singerlake/stream/file_writer.py index 76c8153..4137878 100644 --- a/src/singerlake/stream/file_writer.py +++ b/src/singerlake/stream/file_writer.py @@ -9,12 +9,44 @@ from pathlib import Path from uuid import uuid4 +from pydantic import BaseModel + import singerlake.singer.utils as su if t.TYPE_CHECKING: from .stream import Stream +class SingerFile(BaseModel): + """Singer file object.""" + + stream_id: str + parent_dir: Path + partition: tuple[int, ...] + min_time_extracted: datetime + max_time_extracted: datetime + encryption: t.Literal["none", "bz2", "gz"] = "none" + + @property + def name(self): + """Return the filename.""" + file_start_time = self.min_time_extracted.strftime("%Y%m%dT%H%M%SZ") + file_stop_time = self.max_time_extracted.strftime("%Y%m%dT%H%M%SZ") + file_name = f"{self.stream_id}-{file_start_time}-{file_stop_time}.singer" + if self.encryption != "none": + file_name += f".{self.encryption}" + return file_name + + @property + def path(self): + """Return the file path.""" + return self.parent_dir / self.name + + def __repr__(self) -> str: + """Return a string representation of the object.""" + return f"{self.__class__.__name__}({self.path})" + + class SingerFileWriter: """Base class for writing singer files to disk via temporary directories.""" @@ -67,16 +99,6 @@ def tmp_dir(self, value: Path) -> None: """Set the temporary directory.""" self._tmp_dir = value - @property - def file_name(self) -> str: - """Return the file name.""" - if not self._min_time_extracted or not self._max_time_extracted: - raise ValueError("File has not been written to.") - - file_start_time = self._min_time_extracted.strftime("%Y%m%dT%H%M%SZ") - file_stop_time = self._max_time_extracted.strftime("%Y%m%dT%H%M%SZ") - return f"{self.stream.stream_id}-{file_start_time}-{file_stop_time}.singer" - @property def closed(self) -> bool: """Return True if the file is closed.""" @@ -95,7 +117,7 @@ def open(self) -> SingerFileWriter: self._open_file(self.tmp_dir) return self - def close(self, output_dir: Path) -> Path: + def close(self, output_dir: Path, partition: t.Tuple[t.Any, ...]) -> SingerFile: """Remove the temporary directory.""" if self._file is None: raise ValueError("File not open") @@ -103,14 +125,20 @@ def close(self, output_dir: Path) -> Path: if not self.file.closed: self.file.close() - output_file_path = output_dir / self.file_name - shutil.move(self.file_path, output_file_path) + singer_file = SingerFile( + stream_id=self.stream.stream_id, + parent_dir=output_dir, + partition=partition, + min_time_extracted=self._min_time_extracted, + max_time_extracted=self._max_time_extracted, + ) + shutil.move(self.file_path, singer_file.path) self._file = None if self.tmp_dir.exists(): shutil.rmtree(self.tmp_dir, ignore_errors=True) - return output_file_path + return singer_file def write_record(self, record: dict) -> None: """Write a record to the file.""" diff --git a/src/singerlake/stream/record_writer.py b/src/singerlake/stream/record_writer.py index c72c921..b0346f6 100644 --- a/src/singerlake/stream/record_writer.py +++ b/src/singerlake/stream/record_writer.py @@ -23,19 +23,21 @@ def __init__(self, stream: Stream, output_dir: Path) -> None: self.stream = stream self.output_dir = output_dir - self.files: list[Path] = [] + self.singer_files: list[Path] = [] self.is_finalized = False self._open_files: FlexDict = FlexDict() - def _finalize_file(self, file: SingerFileWriter) -> None: - finalized_file_path = file.close(output_dir=self.output_dir) - self.files.append(finalized_file_path) + def _finalize_file( + self, file: SingerFileWriter, partition: t.Tuple[t.Any, ...] + ) -> None: + singer_file = file.close(output_dir=self.output_dir, partition=partition) + self.singer_files.append(singer_file) def _new_file(self, partition: t.Tuple[t.Any, ...]) -> SingerFileWriter: """Return a new file.""" - file = SingerFileWriter(stream=self.stream).open() - self._open_files.set(keys=partition, value=file) - return file + open_file = SingerFileWriter(stream=self.stream).open() + self._open_files.set(keys=partition, value=open_file) + return open_file def write(self, schema: dict, record: dict) -> None: """Write a record to the stream.""" @@ -43,24 +45,24 @@ def write(self, schema: dict, record: dict) -> None: # partition the record time_extracted = su.get_time_extracted(record) partition = self.stream.partition_record(time_extracted) or ("default",) - file = self._open_files.get(partition) + open_file = self._open_files.get(partition) - if not file or file.closed: - file = self._new_file(partition) + if not open_file or open_file.closed: + open_file = self._new_file(partition) - if file.records_written == MAX_RECORD_COUNT: - self._finalize_file(file) + if open_file.records_written == MAX_RECORD_COUNT: + self._finalize_file(file=open_file, partition=partition) # open a new file - file = self._new_file(partition) + open_file = self._new_file(partition) - if file.records_written == 0: + if open_file.records_written == 0: # write the stream schema - file.write_schema(schema) + open_file.write_schema(schema) - file.write_record(record) + open_file.write_record(record) def finalize(self) -> None: """Finalize the stream.""" - for file in self._open_files.values(nested=True): - self._finalize_file(file) + for partition, open_file in self._open_files.flatten(): + self._finalize_file(file=open_file, partition=tuple(partition)) self.is_finalized = True diff --git a/src/singerlake/stream/stream.py b/src/singerlake/stream/stream.py index aabb016..d2a0f0b 100644 --- a/src/singerlake/stream/stream.py +++ b/src/singerlake/stream/stream.py @@ -1,11 +1,11 @@ import typing as t from contextlib import contextmanager +from .file_writer import SingerFile from .record_writer import RecordWriter if t.TYPE_CHECKING: from datetime import datetime - from pathlib import Path from singerlake import Singerlake from singerlake.config import Partition @@ -33,7 +33,7 @@ def __init__( self.singerlake.config.store.path.partition_by or [] ) - self.files: list[Path] = [] + self.files: list[SingerFile] = [] def partition_record(self, time_extracted: "datetime") -> t.Tuple[str, ...]: """Partition a record.""" @@ -53,7 +53,7 @@ def record_writer(self): yield writer finally: writer.finalize() - self.files.extend(writer.files) + self.files.extend(writer.singer_files) def commit(self): """Commit stream files to storage."""