Skip to content

Commit

Permalink
feat: test local singerlake (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Payne authored Apr 14, 2023
1 parent 36a9719 commit 1f16873
Show file tree
Hide file tree
Showing 17 changed files with 305 additions and 32 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.vscode/
.DS_Store

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
70 changes: 69 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ packages = [{include = "singerlake"}]
python = ">=3.8,<3.12"
pydantic = "^1.10.7"
filelock = "^3.11.0"
pyfarmhash = "^0.3.2"
numpy = "^1.24.2"
base58 = "^2.1.1"

[tool.poetry.group.dev.dependencies]
tox = "^4.4.12"
Expand Down
8 changes: 5 additions & 3 deletions singerlake/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
LAKE_MANIFEST_FILENAME = "manifest.json"
TAP_MANIFEST_FILENAME = "manifest.json"
STREAM_MANIFEST_FILENAME = "manifest.json"
from singerlake.singerlake import SingerLake

__all__ = [
"SingerLake",
]
4 changes: 2 additions & 2 deletions singerlake/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class TapManifest(BaseModel):
class StreamManifest(BaseModel):
"""Stream Manifest."""

files: List[str]
versions: Mapping[str, str]
files: List[str] = []
versions: Mapping[str, str] = {}

def add_files(self, file_names: List[str], schema_hash: str):
"""Add files to this Stream Manifest."""
Expand Down
36 changes: 36 additions & 0 deletions singerlake/singerlake.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,42 @@
from __future__ import annotations

import json
from pathlib import Path
from typing import Any, List, Mapping

import base58
import farmhash
import numpy as np

from singerlake.store import BaseStore


class SingerLake:
"""Singer Lake."""

def __init__(self, store: BaseStore):
self.store = store

def hash_stream_schema(self, stream_schema: Mapping[str, Any]) -> str:
"""Calculate a unique short-hash for given schema."""
data = json.dumps(stream_schema, sort_keys=True)
int64_hash_bytes = (
np.uint64(farmhash.fingerprint64(data)).astype("int64").tobytes()
)
return base58.b58encode(int64_hash_bytes).decode("utf-8")

def write_files(
self,
tap_id: str,
stream_id: str,
stream_schema: Mapping[str, Any],
files: List[Path],
) -> None:
"""Write files to the Lake."""
stream_schema_hash = self.hash_stream_schema(stream_schema=stream_schema)
self.store.add_files_to_stream(
tap_id=tap_id,
stream_id=stream_id,
stream_schema_hash=stream_schema_hash,
files=files,
)
5 changes: 4 additions & 1 deletion singerlake/store/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from .base import BaseStore
from .local import LocalStore

__all__ = ["BaseStore", "LocalStore"]
__all__ = [
"BaseStore",
"LocalStore",
]
19 changes: 9 additions & 10 deletions singerlake/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from pathlib import Path
from typing import List, final

from singerlake import (
from singerlake.models import BaseModel, LakeManifest, StreamManifest, TapManifest
from singerlake.store.const import (
LAKE_MANIFEST_FILENAME,
STREAM_MANIFEST_FILENAME,
TAP_MANIFEST_FILENAME,
)
from singerlake.models import BaseModel, LakeManifest, StreamManifest, TapManifest


class BaseStore(ABC):
Expand Down Expand Up @@ -63,21 +63,21 @@ def get_manifest(self, manifest_path: str, model: BaseModel) -> BaseModel:
"""Get Manifest from path."""

@final
def get_lake_manifest(self) -> LakeManifest | None:
def get_lake_manifest(self) -> LakeManifest:
"""Read Lake manifest."""
return self.get_manifest(
manifest_path=self.get_lake_manifest_path(), model=LakeManifest
)

@final
def get_tap_manifest(self, tap_id: str) -> TapManifest | None:
def get_tap_manifest(self, tap_id: str) -> TapManifest:
"""Read Tap manifest by tap_id."""
return self.get_manifest(
manifest_path=self.get_tap_manifest_path(tap_id=tap_id), model=TapManifest
)

@final
def get_stream_manifest(self, tap_id: str, stream_id: str) -> StreamManifest | None:
def get_stream_manifest(self, tap_id: str, stream_id: str) -> StreamManifest:
"""Read Stream manifest by tap_id and stream_id."""
return self.get_manifest(
manifest_path=self.get_stream_manifest_path(
Expand Down Expand Up @@ -118,23 +118,23 @@ def write_stream_manifest(

@contextmanager
@abstractmethod
def lock_lake(self, timeout: int | None):
def lock_lake(self):
"""Context manager to acquire and release lock on this SingerLake.
Locking is required to update the Lake Manifest.
"""

@contextmanager
@abstractmethod
def lock_tap(self, tap_id: str, timeout: int | None):
def lock_tap(self, tap_id: str):
"""Context manager to acquire and release lock on a Tap by tap_id.
Locking is required to update the Tap Manifest.
"""

@contextmanager
@abstractmethod
def lock_stream(self, tap_id: str, stream_id: str, timeout: int | None):
def lock_stream(self, tap_id: str, stream_id: str):
"""Context manager to acquire and release lock on a Stream by tap_id and stream_id.
Locking is required to update the Stream Manifest.
Expand All @@ -151,14 +151,13 @@ def add_files_to_stream(
stream_id: str,
stream_schema_hash: str,
files: List[Path],
lock_timeout: int | None,
) -> None:
"""Write files to Lake."""
stream_path = self.get_stream_path(
tap_id=tap_id, stream_id=stream_id, stream_schema_hash=stream_schema_hash
)
self.write_files_to_stream(stream_path=stream_path, files=files)
with self.lock_stream(tap_id=tap_id, stream_id=stream_id, timeout=lock_timeout):
with self.lock_stream(tap_id=tap_id, stream_id=stream_id):
manifest = self.get_stream_manifest(tap_id=tap_id, stream_id=stream_id)
manifest.add_files(
file_names=[file.name for file in files], schema_hash=stream_schema_hash
Expand Down
3 changes: 3 additions & 0 deletions singerlake/store/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
LAKE_MANIFEST_FILENAME = "manifest.json"
TAP_MANIFEST_FILENAME = "manifest.json"
STREAM_MANIFEST_FILENAME = "manifest.json"
34 changes: 19 additions & 15 deletions singerlake/store/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
from filelock import FileLock
from pydantic import BaseModel

from singerlake import (
from singerlake.store import BaseStore
from singerlake.store.const import (
LAKE_MANIFEST_FILENAME,
STREAM_MANIFEST_FILENAME,
TAP_MANIFEST_FILENAME,
)
from singerlake.store import BaseStore


class LocalStore(BaseStore):
"""Local Disk SingerLake Store."""

def __init__(self, lake_root: Path):
def __init__(self, lake_root: Path, lock_timeout: int = 30):
self._lake_root = lake_root
self.lock_timeout = lock_timeout

@property
def lake_root(self) -> Path:
Expand Down Expand Up @@ -65,42 +66,45 @@ def get_stream_manifest_path(self, tap_id: str, stream_id: str) -> Path:
/ STREAM_MANIFEST_FILENAME
)

def get_manifest(self, manifest_path: Path, model: BaseModel) -> BaseModel | None:
def get_manifest(self, manifest_path: Path, model: BaseModel) -> BaseModel:
"""Read manifest to at path to given model."""
if manifest_path.exists():
with manifest_path.open() as manifest:
data = json.load(manifest)
return model(**data)
return model()

def write_manifest(self, manifest_path: Path, manifest: BaseModel) -> None:
"""Write manifest to given path."""
manifest_path.mkdir(parents=True, exist_ok=True)
manifest_path.parent.mkdir(parents=True, exist_ok=True)
with manifest_path.open("w") as manifest_file:
manifest_file.write(manifest.json())

@contextmanager
def lock(self, lockfile_path: Path, timeout):
lock = FileLock(lockfile_path, timeout=timeout)
def lock(self, lockfile_path: Path):
lock = FileLock(lockfile_path, timeout=self.lock_timeout)
with lock:
yield

@contextmanager
def lock_lake(self, timeout: int | None):
def lock_lake(self):
lockfile_path = Path(f"{self.get_lake_manifest_path()}.lock")
yield from self.lock(lockfile_path=lockfile_path, timeout=timeout)
yield self.lock(lockfile_path=lockfile_path)

@contextmanager
def lock_tap(self, tap_id: str, timeout: int | None):
def lock_tap(self, tap_id: str):
lockfile_path = Path(f"{self.get_tap_manifest_path(tap_id=tap_id)}.lock")
yield from self.lock(lockfile_path=lockfile_path, timeout=timeout)
yield self.lock(lockfile_path=lockfile_path)

@contextmanager
def lock_stream(self, tap_id: str, stream_id: str, timeout: int | None):
def lock_stream(self, tap_id: str, stream_id: str):
lockfile_path = Path(
f"{self.get_stream_manifest_path(tap_id=tap_id, stream_id=stream_id)}.lock"
)
yield from self.lock(lockfile_path=lockfile_path, timeout=timeout)
yield self.lock(lockfile_path=lockfile_path)

def write_files_to_stream(self, stream_path: Path, files: List[Path]):
for file in files:
shutil.copy(file, stream_path / file.name)
for source in files:
destination = stream_path / source.name
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(source, destination)
Binary file not shown.
25 changes: 25 additions & 0 deletions tests/example_files/entry.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"type": "SCHEMA",
"stream": "entry",
"schema": {
"type": "object",
"properties": {
"id": { "type": "string" },
"from": { "type": "string", "format": "date-time" },
"to": { "type": "string", "format": "date-time" },
"forecast": { "type": "integer" },
"index": { "type": "string" },
"region_id": { "type": "integer" },
"_sdc_extracted_at": {
"type": ["null", "string"],
"format": "date-time"
},
"_sdc_received_at": { "type": ["null", "string"], "format": "date-time" },
"_sdc_deleted_at": { "type": ["null", "string"], "format": "date-time" },
"_sdc_batched_at": { "type": ["null", "string"], "format": "date-time" },
"_sdc_table_version": { "type": ["null", "integer"] },
"_sdc_sequence": { "type": ["null", "integer"] }
}
},
"key_properties": ["id"]
}
Binary file not shown.
Loading

0 comments on commit 1f16873

Please sign in to comment.