Skip to content

Commit

Permalink
feat: commit singer files to store (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgpayne authored Sep 28, 2023
1 parent d1a3fc7 commit a6f741e
Show file tree
Hide file tree
Showing 22 changed files with 522 additions and 167 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.vscode/
.DS_Store
.singerlake/
.ruff_cache/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
12 changes: 6 additions & 6 deletions src/singerlake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pydantic import BaseModel


class Partition(BaseModel):
class PartitionBy(BaseModel):
"""Partition Model."""

by: t.Literal["year", "month", "day", "hour", "minute", "second"]
Expand All @@ -21,11 +21,11 @@ class PathConfig(BaseModel):

path_type: str = "hive"
lake_root: GenericPathModel
partition_by: t.Optional[t.List[Partition]] = [
Partition(by="year"),
Partition(by="month"),
Partition(by="day"),
Partition(by="hour"),
partition_by: t.Optional[t.List[PartitionBy]] = [
PartitionBy(by="year"),
PartitionBy(by="month"),
PartitionBy(by="day"),
PartitionBy(by="hour"),
]


Expand Down
10 changes: 6 additions & 4 deletions src/singerlake/discovery/discovery_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import typing as t

from singerlake.tap import Tap
Expand All @@ -19,14 +21,14 @@ def __init__(self, singerlake: "Singerlake"):
def list_taps(self) -> t.List[str]:
"""List available Taps."""
if self._tap_cache is None:
lake_manifest = self.singerlake.manifest_service.lake_manifest
lake_manifest = self.singerlake.store.lake_manifest
self._tap_cache = lake_manifest.taps

return self._tap_cache

def get_tap(self, tap_id):
def get_tap(self, tap_id) -> Tap | None:
"""Get a Tap by ID."""
tap_manifest = self.singerlake.manifest_service.get_tap_manifest(tap_id=tap_id)
tap_manifest = self.singerlake.store.get_tap_manifest(tap_id=tap_id)
if tap_manifest:
return Tap(singerlake=self.singerlake, tap_manifest=tap_manifest)
raise ValueError(f"Tap {tap_id} not found.")
return None
4 changes: 0 additions & 4 deletions src/singerlake/manifest/__init__.py

This file was deleted.

Empty file removed src/singerlake/manifest/base.py
Empty file.
39 changes: 0 additions & 39 deletions src/singerlake/manifest/manifest_service.py

This file was deleted.

34 changes: 25 additions & 9 deletions src/singerlake/singerlake.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from uuid import uuid4

from singerlake.config import SingerlakeConfig
from singerlake.discovery import DiscoveryService
from singerlake.manifest import ManifestService
from singerlake.store import StoreService
from singerlake.store import BaseStore, StoreService

from singerlake.discovery import DiscoveryService # isort:skip

if t.TYPE_CHECKING:
from singerlake.tap import Tap
Expand All @@ -23,17 +23,16 @@ def __init__(self, config: dict | None = None):

self.instance_id = str(uuid4())
self.config = SingerlakeConfig(**config_dict)
self.manifest_service = ManifestService(singerlake=self)
self.discovery_service = DiscoveryService(singerlake=self)
self.store = StoreService(singerlake=self, config=self.config.store).get_store()

self._store: BaseStore | None = None
self._lake_id: str | None = None

@property
def lake_id(self) -> str:
"""Return the Lake ID."""
if self._lake_id is None:
self._lake_id = self.manifest_service.lake_manifest.lake_id
self._lake_id = self.store.lake_manifest.lake_id
return self._lake_id

@property
Expand All @@ -51,6 +50,15 @@ def working_dir(self) -> Path:
working_dir.mkdir(parents=True, exist_ok=True)
return working_dir

@property
def store(self) -> BaseStore:
"""Return the store instance."""
if self._store is None:
self._store = StoreService(
singerlake=self, config=self.config.store
).get_store()
return self._store

def clean_working_dir(self) -> None:
"""Clean the local working directory."""
shutil.rmtree(self.working_dir, ignore_errors=True)
Expand All @@ -59,6 +67,14 @@ def list_taps(self) -> list[str]:
"""Return Taps stored in this Singerlake."""
return self.discovery_service.list_taps()

def get_tap(self, tap_id: str) -> "Tap":
"""Return a Tap stored in this Singerlake."""
return self.discovery_service.get_tap(tap_id=tap_id)
def get_tap(self, tap_id: str, create: bool = False) -> "Tap" | None:
"""Return a Tap stored in this Singerlake.
Args:
tap_id: Tap ID.
create: If True, create a new Tap if it does not exist.
"""
tap = self.discovery_service.get_tap(tap_id=tap_id)
if tap is None and create:
tap = self.store.create_tap(tap_id=tap_id)
return tap
59 changes: 58 additions & 1 deletion src/singerlake/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,80 @@
import typing as t
from abc import ABC

from singerlake.store.manifest import LakeManifest, TapManifest

if t.TYPE_CHECKING:
from singerlake import Singerlake
from singerlake.store.locker.base import BaseLocker
from singerlake.store.path_manager.base import BasePathManager
from singerlake.stream.file_writer import SingerFile
from singerlake.tap.tap import Tap


class BaseStore(ABC):
"""Base SingerLake storage interface."""

def __init__(self, locker: "BaseLocker", path_manager: "BasePathManager") -> None:
def __init__(
self,
singerlake: "Singerlake",
locker: "BaseLocker",
path_manager: "BasePathManager",
) -> None:
"""Base SingerLake storage interface."""
self.singerlake = singerlake
self.locker = locker
self.path_manager = path_manager

self._lake_manifest: LakeManifest | None = None
self._lake_manifest_checksum: str | None = None

@property
def lake_root(self) -> t.Any:
"""Return the Lake root path."""
return self.path_manager.lake_root

@property
def lake_manifest(self) -> LakeManifest:
"""Return the Lake Manifest."""
if self._lake_manifest is None:
read_lake_manifest = self.read_lake_manifest()
if read_lake_manifest is not None:
self._lake_manifest = LakeManifest(**read_lake_manifest)
else:
raise ValueError("Lake Manifest not found.")
return self._lake_manifest

@t.final
def get_tap_manifest(self, tap_id: str) -> TapManifest | None:
"""Get a Tap Manifest by ID."""
read_tap_manifest = self.read_tap_manifest(tap_id=tap_id)
return None if read_tap_manifest is None else TapManifest(**read_tap_manifest)

# override these methods to implement a custom store
def get_lake_root(self) -> t.Any:
"""Return the Lake root path."""
raise NotImplementedError()

def read_lake_manifest(self) -> dict | None:
"""Read the Lake Manifest."""
raise NotImplementedError()

def read_tap_manifest(self, tap_id: str) -> dict | None:
"""Read a Tap Manifest."""
raise NotImplementedError()

def read_stream_manifest(self, tap_id: str, stream_id: str) -> dict | None:
"""Read a Stream Manifest."""
raise NotImplementedError()

def create_tap(self, tap_id: str) -> "Tap":
"""Create a Tap."""
raise NotImplementedError()

def commit_stream_files(self, stream_files: list["SingerFile"]) -> None:
"""Commit stream files to storage."""
raise NotImplementedError()

def write_tap_manifest(self, tap_id: str, manifest: TapManifest) -> TapManifest:
"""Write a Tap Manifest."""
raise NotImplementedError()
95 changes: 66 additions & 29 deletions src/singerlake/store/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,50 @@

import hashlib
import json
import shutil
import typing as t
from pathlib import Path

from singerlake.store.manifest import TapManifest
from singerlake.store.path_manager.base import BasePathTransformer
from singerlake.tap import Tap

from .base import BaseStore

if t.TYPE_CHECKING:
from singerlake.store.path_manager.base import GenericPath

from .locker.base import BaseLocker
from .path_manager.base import BasePathManager
from singerlake import Singerlake
from singerlake.store.locker.base import BaseLocker
from singerlake.store.path_manager.base import BasePathManager, GenericPath
from singerlake.stream.file_writer import SingerFile


class LocalPathTransformer(BasePathTransformer):
@staticmethod
def transform(generic_path: GenericPath) -> Path:
"""Transform a GenericPath to a pathlib Path."""
if generic_path.relative:
return Path.cwd() / Path(*generic_path.segments)
return Path(*generic_path.segments)


class LocalStore(BaseStore):
"""Local directory store."""

def __init__(self, locker: "BaseLocker", path_manager: "BasePathManager"):
"""Local directory store."""
super().__init__(locker=locker, path_manager=path_manager)
def __init__(
self,
singerlake: "Singerlake",
locker: "BaseLocker",
path_manager: "BasePathManager",
) -> None:
super().__init__(
singerlake=singerlake, locker=locker, path_manager=path_manager
)
self.path_manager.transformer = LocalPathTransformer()

self._lake_manifest_checksum: str | None = None
@property
def lake_manifest_has_changed(self) -> bool:
"""Return True if the Lake Manifest has changed."""
return self.read_lake_manifest_checksum() != self._lake_manifest_checksum

def _md5(self, file_path: Path):
"""Return the md5 checksum of a file."""
Expand All @@ -33,47 +57,60 @@ def _md5(self, file_path: Path):

def _read_json(self, file_path: Path):
"""Read a JSON file."""
if not file_path.exists():
return None
with file_path.open("r", encoding="utf-8") as json_file:
return json.load(json_file)

def _to_path(self, generic_path: "GenericPath") -> Path:
"""Convert a GenericPath to a pathlib Path."""
return Path(*generic_path.segments)

# Lake Manifest
def read_lake_manifest_checksum(self) -> str | None:
"""Read the Lake Manifest checksum."""
lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path)
return self._md5(lake_manifest_path)
return self._md5(self.path_manager.lake_manifest_path)

def read_lake_manifest(self) -> tuple[dict, str] | None:
def read_lake_manifest(self) -> dict | None:
"""Read the Lake Manifest."""
lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path)
lake_manifest = self._read_json(lake_manifest_path)
lake_manifest = self._read_json(self.path_manager.lake_manifest_path)
if lake_manifest is not None:
self._lake_manifest_checksum = self.read_lake_manifest_checksum()
return lake_manifest
return None

@property
def lake_manifest_has_changed(self) -> bool:
"""Return True if the Lake Manifest has changed."""
return self.read_lake_manifest_checksum() != self._lake_manifest_checksum

# Tap Manifest
def read_tap_manifest(self, tap_id: str) -> dict | None:
"""Read a Tap Manifest."""
tap_manifest_path = self._to_path(
self.path_manager.get_tap_manifest_path(tap_id=tap_id)
)
return self._read_json(tap_manifest_path)
return self._read_json(self.path_manager.get_tap_manifest_path(tap_id=tap_id))

# Stream Manifest
def read_stream_manifest(self, tap_id: str, stream_id: str) -> dict | None:
"""Read a Stream Manifest."""
stream_manifest_path = self._to_path(
return self._read_json(
self.path_manager.get_stream_manifest_path(
tap_id=tap_id, stream_id=stream_id
)
)
return self._read_json(stream_manifest_path)

# Stream Files
def _commit_stream_file(self, stream_file: "SingerFile") -> None:
"""Commit a singer file to storage."""
file_path = self.path_manager.get_stream_file_path(stream_file=stream_file)
if not file_path.parent.exists():
file_path.parent.mkdir(parents=True)
shutil.copy(stream_file.path, file_path)

def commit_stream_files(self, stream_files: list["SingerFile"]) -> None:
"""Commit singer files to storage."""
for stream_file in stream_files:
self._commit_stream_file(stream_file=stream_file)

def create_tap(self, tap_id: str) -> Tap:
"""Create a Tap."""
file_path = self.path_manager.get_tap_path(tap_id=tap_id)
file_path.mkdir(parents=True)
tap_manifest = self.write_tap_manifest(tap_id=tap_id, manifest=TapManifest())
return Tap(**tap_manifest.dict())

def write_tap_manifest(self, tap_id: str, manifest: TapManifest) -> TapManifest:
"""Write a Tap Manifest."""
file_path = self.path_manager.get_tap_manifest_path(tap_id=tap_id)
with file_path.open("w", encoding="utf-8") as json_file:
json.dump(manifest.dict(), json_file, indent=2)
return manifest
Loading

0 comments on commit a6f741e

Please sign in to comment.