Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Support loading manifests from remote locations #87

Merged
Merged
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ flowchart LR
dbt-loom currently supports obtaining model definitions from:

- Local manifest files
- Remote manifest files via http(s)
- dbt Cloud
- GCS
- S3-compatible object storage services
Expand All @@ -57,6 +58,8 @@ manifests:
- name: project_name # This should match the project's real name
type: file
config:
# A path to your manifest. This can be either a local path, or a remote
# path accessible via http(s).
path: path/to/manifest.json
```

Expand Down
21 changes: 19 additions & 2 deletions dbt_loom/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from enum import Enum
from pathlib import Path
import re
from typing import List, Union
from urllib.parse import ParseResult, urlparse

from pydantic import BaseModel
from pydantic import BaseModel, validator

from dbt_loom.clients.az_blob import AzureReferenceConfig
from dbt_loom.clients.dbt_cloud import DbtCloudReferenceConfig
Expand All @@ -23,7 +25,22 @@ class ManifestReferenceType(str, Enum):
class FileReferenceConfig(BaseModel):
"""Configuration for a file reference"""

path: Path
path: ParseResult

@validator("path", pre=True, always=True)
def default_path(cls, v, values) -> ParseResult:
"""
Check if the provided path is a valid URL. If not, convert it into an
absolute file path.
"""

if isinstance(v, ParseResult):
return v

if bool(re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*://", v)):
return urlparse(v)

return urlparse("file://" + str(Path(v).absolute()))


class ManifestReference(BaseModel):
Expand Down
68 changes: 61 additions & 7 deletions dbt_loom/manifests.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import datetime
from io import BytesIO
import json
import gzip
from pathlib import Path
from typing import Dict, List, Optional
from urllib.parse import urlunparse

from pydantic import BaseModel, Field, validator
import requests

try:
from dbt.artifacts.resources.types import NodeType
Expand Down Expand Up @@ -84,27 +88,77 @@ def dump(self) -> Dict:
return self.dict(exclude=exclude_set)


class UnknownManifestPathType(Exception):
"""Raised when the ManifestLoader receives a FileReferenceConfig with a path that does not have a known URL scheme."""


class InvalidManifestPath(Exception):
"""Raised when the ManifestLoader receives a FileReferenceConfig with an invalid path."""


class ManifestLoader:
def __init__(self):
self.loading_functions = {
ManifestReferenceType.file: self.load_from_local_filesystem,
ManifestReferenceType.file: self.load_from_path,
ManifestReferenceType.dbt_cloud: self.load_from_dbt_cloud,
ManifestReferenceType.gcs: self.load_from_gcs,
ManifestReferenceType.s3: self.load_from_s3,
ManifestReferenceType.azure: self.load_from_azure,
}

@staticmethod
def load_from_path(config: FileReferenceConfig) -> Dict:
"""
Load a manifest dictionary based on a FileReferenceConfig. This config's
path can point to either a local file or a URL to a remote location.
"""

if config.path.scheme in ("http", "https"):
return ManifestLoader.load_from_http(config)

if config.path.scheme in ("file"):
return ManifestLoader.load_from_local_filesystem(config)

raise UnknownManifestPathType()

@staticmethod
def load_from_local_filesystem(config: FileReferenceConfig) -> Dict:
"""Load a manifest dictionary from a local file"""
if not config.path.exists():
raise LoomConfigurationError(f"The path `{config.path}` does not exist.")

if config.path.suffix == ".gz":
with gzip.open(config.path, "rt") as file:
if not config.path.path:
raise InvalidManifestPath()

file_path = Path(config.path.path)

if not file_path.exists():
raise LoomConfigurationError(f"The path `{file_path}` does not exist.")

if file_path.suffix == ".gz":
with gzip.open(file_path, "rt") as file:
return json.load(file)
else:
return json.load(open(config.path))

return json.load(open(file_path))

@staticmethod
def load_from_http(config: FileReferenceConfig) -> Dict:
"""Load a manifest dictionary from a local file"""

if not config.path.path:
raise InvalidManifestPath()

response = requests.get(urlunparse(config.path), stream=True)
response.raise_for_status() # Check for request errors

# Check for compression on the file. If compressed, store it in a buffer
# and decompress it.
if (
config.path.path.endswith(".gz")
or response.headers.get("Content-Encoding") == "gzip"
):
with gzip.GzipFile(fileobj=BytesIO(response.content)) as gz_file:
return json.load(gz_file)

return response.json()

@staticmethod
def load_from_dbt_cloud(config: DbtCloudReferenceConfig) -> Dict:
Expand Down
6 changes: 6 additions & 0 deletions tests/test_dbt_core_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def test_dbt_core_runs_loom_plugin():
"revenue.orders.v2",
}

os.chdir(starting_path)

assert set(output.result).issuperset(
subset
), "The child project is missing expected nodes. Check that injection still works."
Expand Down Expand Up @@ -88,6 +90,8 @@ def test_dbt_loom_injects_dependencies():

path.unlink()

os.chdir(starting_path)

# Make sure nothing failed
assert isinstance(output.exception, dbt.exceptions.DbtReferenceError)

Expand Down Expand Up @@ -129,5 +133,7 @@ def test_dbt_loom_injects_groups():

path.unlink()

os.chdir(starting_path)

# Make sure nothing failed
assert isinstance(output.exception, dbt.exceptions.DbtReferenceError)
99 changes: 99 additions & 0 deletions tests/test_manifest_loaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import json
from pathlib import Path

from typing import Dict, Generator, Tuple
from urllib.parse import urlparse

import pytest
from dbt_loom.config import (
FileReferenceConfig,
ManifestReference,
ManifestReferenceType,
)
from dbt_loom.manifests import ManifestLoader, UnknownManifestPathType


@pytest.fixture
def example_file() -> Generator[Tuple[Path, Dict], None, None]:
example_content = {"foo": "bar"}
path = Path("example.json")
with open(path, "w") as file:
json.dump(example_content, file)
yield path, example_content
path.unlink()


def test_load_from_local_filesystem_pass(example_file):
"""Test that ManifestLoader can load a local JSON file."""

path, example_content = example_file

file_config = FileReferenceConfig(
path=urlparse("file://" + str(Path(path).absolute()))
)

output = ManifestLoader.load_from_local_filesystem(file_config)

assert output == example_content


def test_load_from_local_filesystem_local_path(example_file):
"""Test that ManifestLoader can load a local JSON file."""

path, example_content = example_file

file_config = FileReferenceConfig(path=str(path)) # type: ignore

output = ManifestLoader.load_from_local_filesystem(file_config)

assert output == example_content


def test_load_from_path_fails_invalid_scheme(example_file):
"""
est that ManifestLoader will raise the appropriate exception if an invalid
scheme is applied.
"""

file_config = FileReferenceConfig(
path=urlparse("ftp://example.com/example.json"),
) # type: ignore

with pytest.raises(UnknownManifestPathType):
ManifestLoader.load_from_path(file_config)


def test_load_from_remote_pass(example_file):
"""Test that ManifestLoader can load a remote JSON file via HTTP(S)."""

_, example_content = example_file

file_config = FileReferenceConfig(
path=urlparse(
"https://s3.us-east-2.amazonaws.com/com.nicholasyager.dbt-loom/example.json"
),
)

output = ManifestLoader.load_from_http(file_config)

assert output == example_content


def test_manifest_loader_selection(example_file):
"""Confirm scheme parsing works for picking the manifest loader."""
_, example_content = example_file
manifest_loader = ManifestLoader()

file_config = FileReferenceConfig(
path=urlparse(
"https://s3.us-east-2.amazonaws.com/com.nicholasyager.dbt-loom/example.json"
),
)

manifest_reference = ManifestReference(
name="example", type=ManifestReferenceType.file, config=file_config
)

manifest = manifest_loader.load(manifest_reference)

assert manifest == example_content
Loading