Skip to content

Commit

Permalink
commit files to local lake
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Payne committed Sep 28, 2023
1 parent 77dadbf commit 3b1ca22
Show file tree
Hide file tree
Showing 13 changed files with 248 additions and 103 deletions.
12 changes: 6 additions & 6 deletions src/singerlake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pydantic import BaseModel


class Partition(BaseModel):
class PartitionBy(BaseModel):
"""Partition Model."""

by: t.Literal["year", "month", "day", "hour", "minute", "second"]
Expand All @@ -21,11 +21,11 @@ class PathConfig(BaseModel):

path_type: str = "hive"
lake_root: GenericPathModel
partition_by: t.Optional[t.List[Partition]] = [
Partition(by="year"),
Partition(by="month"),
Partition(by="day"),
Partition(by="hour"),
partition_by: t.Optional[t.List[PartitionBy]] = [
PartitionBy(by="year"),
PartitionBy(by="month"),
PartitionBy(by="day"),
PartitionBy(by="hour"),
]


Expand Down
3 changes: 2 additions & 1 deletion src/singerlake/singerlake.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
from uuid import uuid4

from singerlake.config import SingerlakeConfig
from singerlake.discovery import DiscoveryService
from singerlake.store import BaseStore, StoreService

from singerlake.discovery import DiscoveryService # isort:skip

if t.TYPE_CHECKING:
from singerlake.tap import Tap

Expand Down
2 changes: 1 addition & 1 deletion src/singerlake/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
@property
def lake_root(self) -> t.Any:
"""Return the Lake root path."""
return self.get_lake_root()
return self.path_manager.lake_root

@property
def lake_manifest(self) -> LakeManifest:
Expand Down
70 changes: 36 additions & 34 deletions src/singerlake/store/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,46 @@
from pathlib import Path

from singerlake.store.manifest import TapManifest
from singerlake.store.path_manager.base import BasePathTransformer
from singerlake.tap import Tap

from .base import BaseStore

if t.TYPE_CHECKING:
from singerlake.store.path_manager.base import GenericPath
from singerlake import Singerlake
from singerlake.store.locker.base import BaseLocker
from singerlake.store.path_manager.base import BasePathManager, GenericPath
from singerlake.stream.file_writer import SingerFile


class LocalPathTransformer(BasePathTransformer):
@staticmethod
def transform(generic_path: GenericPath) -> Path:
"""Transform a GenericPath to a pathlib Path."""
if generic_path.relative:
return Path.cwd() / Path(*generic_path.segments)
return Path(*generic_path.segments)


class LocalStore(BaseStore):
"""Local directory store."""

def __init__(
self,
singerlake: "Singerlake",
locker: "BaseLocker",
path_manager: "BasePathManager",
) -> None:
super().__init__(
singerlake=singerlake, locker=locker, path_manager=path_manager
)
self.path_manager.transformer = LocalPathTransformer()

@property
def lake_manifest_has_changed(self) -> bool:
"""Return True if the Lake Manifest has changed."""
return self.read_lake_manifest_checksum() != self._lake_manifest_checksum

def _md5(self, file_path: Path):
"""Return the md5 checksum of a file."""
hash_md5 = hashlib.md5()
Expand All @@ -34,60 +62,36 @@ def _read_json(self, file_path: Path):
with file_path.open("r", encoding="utf-8") as json_file:
return json.load(json_file)

def _to_path(self, generic_path: "GenericPath") -> Path:
"""Convert a GenericPath to a pathlib Path."""
return Path(*generic_path.segments)

def get_lake_root(self) -> Path:
"""Return the Lake root path."""
generic_path = self.path_manager.lake_root
if generic_path.relative:
return Path.cwd() / Path(*generic_path.segments)
return Path(*generic_path.segments)

def read_lake_manifest_checksum(self) -> str | None:
"""Read the Lake Manifest checksum."""
lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path)
return self._md5(lake_manifest_path)
return self._md5(self.path_manager.lake_manifest_path)

def read_lake_manifest(self) -> dict | None:
"""Read the Lake Manifest."""
lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path)
lake_manifest = self._read_json(lake_manifest_path)
lake_manifest = self._read_json(self.path_manager.lake_manifest_path)
if lake_manifest is not None:
self._lake_manifest_checksum = self.read_lake_manifest_checksum()
return lake_manifest
return None

@property
def lake_manifest_has_changed(self) -> bool:
"""Return True if the Lake Manifest has changed."""
return self.read_lake_manifest_checksum() != self._lake_manifest_checksum

# Tap Manifest
def read_tap_manifest(self, tap_id: str) -> dict | None:
"""Read a Tap Manifest."""
tap_manifest_path = self._to_path(
self.path_manager.get_tap_manifest_path(tap_id=tap_id)
)
return self._read_json(tap_manifest_path)
return self._read_json(self.path_manager.get_tap_manifest_path(tap_id=tap_id))

# Stream Manifest
def read_stream_manifest(self, tap_id: str, stream_id: str) -> dict | None:
"""Read a Stream Manifest."""
stream_manifest_path = self._to_path(
return self._read_json(
self.path_manager.get_stream_manifest_path(
tap_id=tap_id, stream_id=stream_id
)
)
return self._read_json(stream_manifest_path)

# Stream Files
def _commit_stream_file(self, stream_file: "SingerFile") -> None:
"""Commit a singer file to storage."""
file_path = self._to_path(
self.path_manager.get_stream_file_path(stream_file=stream_file)
)
file_path = self.path_manager.get_stream_file_path(stream_file=stream_file)
if not file_path.parent.exists():
file_path.parent.mkdir(parents=True)
shutil.copy(stream_file.path, file_path)
Expand All @@ -99,16 +103,14 @@ def commit_stream_files(self, stream_files: list["SingerFile"]) -> None:

def create_tap(self, tap_id: str) -> Tap:
"""Create a Tap."""
file_path = self._to_path(self.path_manager.get_tap_path(tap_id=tap_id))
file_path = self.path_manager.get_tap_path(tap_id=tap_id)
file_path.mkdir(parents=True)
tap_manifest = self.write_tap_manifest(tap_id=tap_id, manifest=TapManifest())
return Tap(**tap_manifest.dict())

def write_tap_manifest(self, tap_id: str, manifest: TapManifest) -> TapManifest:
"""Write a Tap Manifest."""
file_path = self._to_path(
self.path_manager.get_tap_manifest_path(tap_id=tap_id)
)
file_path = self.path_manager.get_tap_manifest_path(tap_id=tap_id)
with file_path.open("w", encoding="utf-8") as json_file:
json.dump(manifest.dict(), json_file, indent=2)
return manifest
6 changes: 4 additions & 2 deletions src/singerlake/store/path_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import BasePathManager
from .base import BasePathManager, GenericPath, Partition
from .constant import (
LAKE_MANIFEST_FILENAME,
STREAM_MANIFEST_FILENAME,
Expand All @@ -8,8 +8,10 @@

__all__ = [
"BasePathManager",
"PathService",
"GenericPath",
"Partition",
"LAKE_MANIFEST_FILENAME",
"TAP_MANIFEST_FILENAME",
"STREAM_MANIFEST_FILENAME",
"PathService",
]
147 changes: 122 additions & 25 deletions src/singerlake/store/path_manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import json
import typing as t
from collections import namedtuple
from datetime import datetime
from functools import lru_cache

import base58
import farmhash
Expand All @@ -14,10 +17,13 @@
)

if t.TYPE_CHECKING:
from singerlake.config import PathConfig
from singerlake.config import PartitionBy, PathConfig
from singerlake.stream.file_writer import SingerFile


Partition = namedtuple("Partition", ["name", "value"])


class GenericPath:

"""Generic path class."""
Expand Down Expand Up @@ -53,10 +59,100 @@ def from_model(cls, model: t.Any) -> "GenericPath":
return GenericPath(model.segments, relative=model.relative)


class BasePathTransformer:
@staticmethod
def transform(generic_path: GenericPath) -> t.Any:
"""Transform a path."""
return generic_path


class BasePathManager:
def __init__(self, config: "PathConfig"):
self.config = config
self.lake_root = GenericPath.from_model(self.config.lake_root)
self._lake_root: GenericPath | None = None
self._transformer = BasePathTransformer()

@property
def _generic_lake_root(self) -> GenericPath:
"""Compile the lake root path."""
if self._lake_root is None:
self._lake_root = GenericPath.from_model(self.config.lake_root)
return self._lake_root

@property
def _generic_lake_manifest_path(self) -> GenericPath:
"""Compile the lake manifest path."""
return self._generic_lake_root.extend(*("raw", LAKE_MANIFEST_FILENAME))

def _generic_tap_path(self, tap_id: str) -> GenericPath:
"""Compile the tap path."""
return self._generic_lake_root.extend(*("raw", tap_id))

def _generic_tap_manifest_path(self, tap_id: str) -> GenericPath:
"""Compile the tap manifest path."""
return self._generic_tap_path(tap_id=tap_id).extend(TAP_MANIFEST_FILENAME)

def _generic_stream_path(self, tap_id: str, stream_id: str) -> GenericPath:
"""Compile the stream path."""
return self._generic_tap_path(tap_id=tap_id).extend(stream_id)

def _generic_stream_manifest_path(self, tap_id: str, stream_id: str) -> GenericPath:
"""Compile the stream manifest path."""
return self._generic_stream_path(tap_id=tap_id, stream_id=stream_id).extend(
STREAM_MANIFEST_FILENAME
)

def _generic_stream_file_path(self, stream_file: "SingerFile") -> GenericPath:
"""Compile the stream file path."""
stream_path = self._generic_stream_path(
tap_id=stream_file.tap_id, stream_id=stream_file.stream_id
)
stream_path = stream_path.extend(self.hash_stream_schema(stream_file.schema_))
for partition in stream_file.partitions:
stream_path = stream_path.extend(self.format_partition(partition))
return stream_path.extend(stream_file.name)

@property
def lake_root(self) -> t.Any:
"""Get the lake root path."""
return self.transform(self._generic_lake_root)

@property
def lake_manifest_path(self) -> t.Any:
"""Get the lake manifest path."""
return self.transform(self._generic_lake_manifest_path)

@property
def transformer(self) -> BasePathTransformer:
"""Get the path transformer."""
return self._transformer

@transformer.setter
def transformer(self, transformer: BasePathTransformer) -> None:
"""Set the path transformer."""
self._transformer = transformer

@property
def file_partition_by(self) -> t.List["PartitionBy"]:
"""Get the file partition."""
return self.config.partition_by or []

@lru_cache
def get_record_partitions(
self, time_extracted: "datetime"
) -> t.Tuple[Partition, ...]:
"""Partition a record."""
partitions = [
Partition(
name=partition_by.by, value=getattr(time_extracted, partition_by.by)
)
for partition_by in self.file_partition_by
]
return tuple(partitions)

def format_partition(self, partition: Partition) -> str:
"""Format a partition."""
return str(partition.value)

def hash_stream_schema(self, stream_schema: t.Mapping[str, t.Any]) -> str:
"""Calculate a unique short-hash for given schema."""
Expand All @@ -66,35 +162,36 @@ def hash_stream_schema(self, stream_schema: t.Mapping[str, t.Any]) -> str:
)
return base58.b58encode(int64_hash_bytes).decode("utf-8")

@property
def lake_manifest_path(self) -> GenericPath:
"""Get the lake manifest path."""
return self.lake_root.extend(*("raw", LAKE_MANIFEST_FILENAME))

def get_tap_path(self, tap_id: str) -> GenericPath:
@t.final
def get_tap_path(self, tap_id: str) -> t.Any:
"""Get the tap path."""
return self.lake_root.extend(*("raw", tap_id))
return self.transform(self._generic_tap_path(tap_id=tap_id))

def get_tap_manifest_path(self, tap_id: str) -> GenericPath:
@t.final
def get_tap_manifest_path(self, tap_id: str) -> t.Any:
"""Get the tap manifest path."""
return self.get_tap_path(tap_id=tap_id).extend(TAP_MANIFEST_FILENAME)
return self.transform(self._generic_tap_manifest_path(tap_id=tap_id))

def get_stream_path(self, tap_id: str, stream_id: str) -> GenericPath:
@t.final
def get_stream_path(self, tap_id: str, stream_id: str) -> t.Any:
"""Get the stream path."""
return self.get_tap_path(tap_id=tap_id).extend(stream_id)
return self.transform(
self._generic_stream_path(tap_id=tap_id, stream_id=stream_id)
)

def get_stream_manifest_path(self, tap_id: str, stream_id: str) -> GenericPath:
@t.final
def get_stream_manifest_path(self, tap_id: str, stream_id: str) -> t.Any:
"""Get the stream manifest path."""
return self.get_stream_path(tap_id=tap_id, stream_id=stream_id).extend(
STREAM_MANIFEST_FILENAME
)
return self.transform(self._generic_stream_manifest_path(tap_id, stream_id))

def get_stream_file_path(self, stream_file: "SingerFile") -> GenericPath:
@t.final
def get_stream_file_path(self, stream_file: "SingerFile") -> t.Any:
"""Get the stream file path."""
return (
self.get_stream_path(
tap_id=stream_file.tap_id, stream_id=stream_file.stream_id
)
.extend(self.hash_stream_schema(stream_file.schema_))
.extend(stream_file.name)
)
return self.transform(self._generic_stream_file_path(stream_file))

def transform(self, path: GenericPath) -> t.Any:
"""Run before returning a path from get methods.
Override to transform GenericPath to a different path type.
"""
return self.transformer.transform(path)
9 changes: 9 additions & 0 deletions src/singerlake/store/path_manager/hive.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import typing as t

from .base import BasePathManager

if t.TYPE_CHECKING:
from singerlake.store.path_manager import Partition


class HivePathManager(BasePathManager):
"""HivePathManager is a path manager for Hive paths."""

def format_partition(self, partition: "Partition") -> str:
"""Format a partition."""
return f"{partition.name}={partition.value}"
Loading

0 comments on commit 3b1ca22

Please sign in to comment.