Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Payne committed Sep 21, 2023
1 parent d1a3fc7 commit e4eb0ca
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.vscode/
.DS_Store
.singerlake/
.ruff_cache/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
7 changes: 7 additions & 0 deletions src/singerlake/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from abc import ABC

if t.TYPE_CHECKING:
from pathlib import Path

from singerlake.store.locker.base import BaseLocker
from singerlake.store.path_manager.base import BasePathManager
from singerlake.stream.stream import Stream


class BaseStore(ABC):
Expand All @@ -23,3 +26,7 @@ def read_tap_manifest(self, tap_id: str) -> dict | None:
def read_stream_manifest(self, tap_id: str, stream_id: str) -> dict | None:
"""Read a Stream Manifest."""
raise NotImplementedError()

def commit_stream_files(self, stream: "Stream", stream_files: list["Path"]) -> None:
"""Commit stream files to storage."""
raise NotImplementedError()
11 changes: 11 additions & 0 deletions src/singerlake/store/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

if t.TYPE_CHECKING:
from singerlake.store.path_manager.base import GenericPath
from singerlake.stream.stream import Stream

from .locker.base import BaseLocker
from .path_manager.base import BasePathManager
Expand Down Expand Up @@ -77,3 +78,13 @@ def read_stream_manifest(self, tap_id: str, stream_id: str) -> dict | None:
)
)
return self._read_json(stream_manifest_path)

# Stream Files
def _commit_stream_file(self, stream: "Stream", stream_file: "Path") -> None:
"""Commit a stream file to storage."""
pass

def commit_stream_files(self, stream: "Stream", stream_files: list["Path"]) -> None:
"""Commit stream files to storage."""
for stream_file in stream_files:
self._commit_stream_file(stream=stream, stream_file=stream_file)
5 changes: 1 addition & 4 deletions src/singerlake/stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,4 @@ def record_writer(self):

def commit(self):
"""Commit stream files to storage."""
raise NotImplementedError
# self.singerlake.store.commit_stream_files(
# stream=self, stream_files=self.files
# )
self.singerlake.store.commit_stream_files(stream=self, stream_files=self.files)

0 comments on commit e4eb0ca

Please sign in to comment.