Skip to content

Commit

Permalink
encapsulate singer file with metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Payne committed Sep 21, 2023
1 parent e4eb0ca commit dc9cc91
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 35 deletions.
56 changes: 42 additions & 14 deletions src/singerlake/stream/file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,44 @@
from pathlib import Path
from uuid import uuid4

from pydantic import BaseModel

import singerlake.singer.utils as su

if t.TYPE_CHECKING:
from .stream import Stream


class SingerFile(BaseModel):
"""Singer file object."""

stream_id: str
parent_dir: Path
partition: tuple[int, ...]
min_time_extracted: datetime
max_time_extracted: datetime
encryption: t.Literal["none", "bz2", "gz"] = "none"

@property
def name(self):
"""Return the filename."""
file_start_time = self.min_time_extracted.strftime("%Y%m%dT%H%M%SZ")
file_stop_time = self.max_time_extracted.strftime("%Y%m%dT%H%M%SZ")
file_name = f"{self.stream_id}-{file_start_time}-{file_stop_time}.singer"
if self.encryption != "none":
file_name += f".{self.encryption}"
return file_name

@property
def path(self):
"""Return the file path."""
return self.parent_dir / self.name

def __repr__(self) -> str:
"""Return a string representation of the object."""
return f"{self.__class__.__name__}({self.path})"


class SingerFileWriter:
"""Base class for writing singer files to disk via temporary directories."""

Expand Down Expand Up @@ -67,16 +99,6 @@ def tmp_dir(self, value: Path) -> None:
"""Set the temporary directory."""
self._tmp_dir = value

@property
def file_name(self) -> str:
"""Return the file name."""
if not self._min_time_extracted or not self._max_time_extracted:
raise ValueError("File has not been written to.")

file_start_time = self._min_time_extracted.strftime("%Y%m%dT%H%M%SZ")
file_stop_time = self._max_time_extracted.strftime("%Y%m%dT%H%M%SZ")
return f"{self.stream.stream_id}-{file_start_time}-{file_stop_time}.singer"

@property
def closed(self) -> bool:
"""Return True if the file is closed."""
Expand All @@ -95,22 +117,28 @@ def open(self) -> SingerFileWriter:
self._open_file(self.tmp_dir)
return self

def close(self, output_dir: Path) -> Path:
def close(self, output_dir: Path, partition: t.Tuple[t.Any, ...]) -> SingerFile:
"""Remove the temporary directory."""
if self._file is None:
raise ValueError("File not open")

if not self.file.closed:
self.file.close()

output_file_path = output_dir / self.file_name
shutil.move(self.file_path, output_file_path)
singer_file = SingerFile(
stream_id=self.stream.stream_id,
parent_dir=output_dir,
partition=partition,
min_time_extracted=self._min_time_extracted,
max_time_extracted=self._max_time_extracted,
)
shutil.move(self.file_path, singer_file.path)
self._file = None

if self.tmp_dir.exists():
shutil.rmtree(self.tmp_dir, ignore_errors=True)

return output_file_path
return singer_file

def write_record(self, record: dict) -> None:
"""Write a record to the file."""
Expand Down
38 changes: 20 additions & 18 deletions src/singerlake/stream/record_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,44 +23,46 @@ def __init__(self, stream: Stream, output_dir: Path) -> None:
self.stream = stream
self.output_dir = output_dir

self.files: list[Path] = []
self.singer_files: list[Path] = []
self.is_finalized = False
self._open_files: FlexDict = FlexDict()

def _finalize_file(self, file: SingerFileWriter) -> None:
finalized_file_path = file.close(output_dir=self.output_dir)
self.files.append(finalized_file_path)
def _finalize_file(
self, file: SingerFileWriter, partition: t.Tuple[t.Any, ...]
) -> None:
singer_file = file.close(output_dir=self.output_dir, partition=partition)
self.singer_files.append(singer_file)

def _new_file(self, partition: t.Tuple[t.Any, ...]) -> SingerFileWriter:
"""Return a new file."""
file = SingerFileWriter(stream=self.stream).open()
self._open_files.set(keys=partition, value=file)
return file
open_file = SingerFileWriter(stream=self.stream).open()
self._open_files.set(keys=partition, value=open_file)
return open_file

def write(self, schema: dict, record: dict) -> None:
"""Write a record to the stream."""

# partition the record
time_extracted = su.get_time_extracted(record)
partition = self.stream.partition_record(time_extracted) or ("default",)
file = self._open_files.get(partition)
open_file = self._open_files.get(partition)

if not file or file.closed:
file = self._new_file(partition)
if not open_file or open_file.closed:
open_file = self._new_file(partition)

if file.records_written == MAX_RECORD_COUNT:
self._finalize_file(file)
if open_file.records_written == MAX_RECORD_COUNT:
self._finalize_file(file=open_file, partition=partition)
# open a new file
file = self._new_file(partition)
open_file = self._new_file(partition)

if file.records_written == 0:
if open_file.records_written == 0:
# write the stream schema
file.write_schema(schema)
open_file.write_schema(schema)

file.write_record(record)
open_file.write_record(record)

def finalize(self) -> None:
"""Finalize the stream."""
for file in self._open_files.values(nested=True):
self._finalize_file(file)
for partition, open_file in self._open_files.flatten():
self._finalize_file(file=open_file, partition=tuple(partition))
self.is_finalized = True
6 changes: 3 additions & 3 deletions src/singerlake/stream/stream.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import typing as t
from contextlib import contextmanager

from .file_writer import SingerFile
from .record_writer import RecordWriter

if t.TYPE_CHECKING:
from datetime import datetime
from pathlib import Path

from singerlake import Singerlake
from singerlake.config import Partition
Expand Down Expand Up @@ -33,7 +33,7 @@ def __init__(
self.singerlake.config.store.path.partition_by or []
)

self.files: list[Path] = []
self.files: list[SingerFile] = []

def partition_record(self, time_extracted: "datetime") -> t.Tuple[str, ...]:
"""Partition a record."""
Expand All @@ -53,7 +53,7 @@ def record_writer(self):
yield writer
finally:
writer.finalize()
self.files.extend(writer.files)
self.files.extend(writer.singer_files)

def commit(self):
"""Commit stream files to storage."""
Expand Down

0 comments on commit dc9cc91

Please sign in to comment.