diff --git a/src/singerlake/store/path_manager/base.py b/src/singerlake/store/path_manager/base.py index e6833dd..e50ab87 100644 --- a/src/singerlake/store/path_manager/base.py +++ b/src/singerlake/store/path_manager/base.py @@ -95,6 +95,6 @@ def get_stream_file_path(self, stream_file: "SingerFile") -> GenericPath: self.get_stream_path( tap_id=stream_file.tap_id, stream_id=stream_file.stream_id ) - .extend(self.hash_stream_schema(stream_file.schema)) - .extend(stream_file.filename) + .extend(self.hash_stream_schema(stream_file.schema_)) + .extend(stream_file.name) ) diff --git a/src/singerlake/stream/file_writer.py b/src/singerlake/stream/file_writer.py index dc008f6..b11661f 100644 --- a/src/singerlake/stream/file_writer.py +++ b/src/singerlake/stream/file_writer.py @@ -9,7 +9,7 @@ from pathlib import Path from uuid import uuid4 -from pydantic import BaseModel +from pydantic import BaseModel, Field import singerlake.singer.utils as su @@ -21,7 +21,7 @@ class SingerFile(BaseModel): """Singer file object.""" tap_id: str - schema: dict + schema_: dict = Field(alias="schema") parent_dir: Path partition: tuple[int, ...] min_time_extracted: datetime @@ -41,7 +41,7 @@ def name(self): @property def stream_id(self): """Return the stream ID.""" - return self.schema["stream"] + return self.schema_["stream"] @property def path(self): diff --git a/src/singerlake/stream/stream.py b/src/singerlake/stream/stream.py index dc41ed4..aad703f 100644 --- a/src/singerlake/stream/stream.py +++ b/src/singerlake/stream/stream.py @@ -57,4 +57,4 @@ def record_writer(self): def commit(self): """Commit stream files to storage.""" - self.singerlake.store.commit_stream_files(tap=self.tap, stream_files=self.files) + self.singerlake.store.commit_stream_files(stream_files=self.files) diff --git a/tests/conftest.py b/tests/conftest.py index 715542c..674767d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -60,6 +60,19 @@ def _clean_lake_dir(lake_root: Path, lake_manifest: dict): lake_manifest, open(lake_root / "manifest.json", "w", encoding="utf-8"), ) + (lake_root / "raw").mkdir(parents=True, exist_ok=True) + (lake_root / "raw" / "tap-carbon-intensity").mkdir(parents=True, exist_ok=True) + json.dump( + { + "tap_id": "tap-carbon-intensity", + "streams": ["entry", "generationmix", "region"], + }, + open( + lake_root / "raw" / "tap-carbon-intensity" / "manifest.json", + "w", + encoding="utf-8", + ), + ) @pytest.fixture(scope="session") @@ -69,6 +82,7 @@ def write_singerlake(write_singerlake_config: dict): lake_root = singerlake.store.get_lake_root() if lake_root.exists(): _clean_lake_dir(lake_root, {"lake_id": "merry-lizard"}) + yield singerlake singerlake.clean_working_dir() _clean_lake_dir(lake_root, {"lake_id": "merry-lizard"}) diff --git a/tests/data/write_lake/raw/tap-carbon-intensity/manifest.json b/tests/data/write_lake/raw/tap-carbon-intensity/manifest.json new file mode 100644 index 0000000..065946c --- /dev/null +++ b/tests/data/write_lake/raw/tap-carbon-intensity/manifest.json @@ -0,0 +1 @@ +{"tap_id": "tap-carbon-intensity", "streams": ["entry", "generationmix", "region"]} diff --git a/tests/test_singerlake.py b/tests/test_singerlake.py index 71e7434..539c975 100644 --- a/tests/test_singerlake.py +++ b/tests/test_singerlake.py @@ -53,5 +53,6 @@ def test_stream_commit(write_singerlake): / "raw" / "tap-carbon-intensity" / "entry" + / "Y8Mjkb4i9yM" ) assert stream_files_path.exists()