Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
Tag request spans with manifest size (#104)
Browse files Browse the repository at this point in the history
* Add a few todos for where to add the size calculation
Update the CachedManifest object to have a size field and default it to 0 for now

* Add size to the state controller class and return it in the compile query result

* Adjust tests following the new changes to the cache

* Remove manifest size from the compile response

* Tag spans in the state controller when loading the manifest from cache or disk

* Calculate the size of the manifest whenever it's read from disk. Store that size in the cached object.
Remove some test span code

* Calculate the manifest size on startup too

* Add a temporary location where the span will be tagged once development is complete

* Move the os.path.getsize call into the filesystem service

* Fix a test after adding the get size call to server startuo

* Remove outdated dummy code to make a test pass

* more tagging work and cleanup

* on the /parse flow, the manifest is only loaded into memory and not yet written out to disk, so we can't read the size of it in parse_from_source(). Instead, the size should be retrieved in serialize_manifest() after it's been written out to disk

* Add a method to update the cache using what's in StateController. This is needed because the cache update was pulled out of the parse_from_source() method and will be called after the manifest has been written out to disk to ensure we can get the size of it

* Only tag the parse endpoint span with manifest size

* Don't tag the span on calls to the push endpoint because I don't think it adds much

* Remove unused import

* Remove unused import

* Formatting

* Revert change

* fix linter issues

* Remove unused import

* Install ddtrace as part of the run tests step since it's needed in views.py

* try to fix linter

* Add changelog

* Add back in missing line to calculate and set the manifest size when serializing to disk

* Move the tracing logic to tracer from views. This is cleaner and allows us to skip all tracing logic if it isn't enabled

* Try removing ddtrace now that the logic has been moved out of views

* Change naming for consistency

* Wrap the get_size function with @tracer.wrap
  • Loading branch information
jp-dbt authored Oct 14, 2022
1 parent bdb0c04 commit fec2854
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 16 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20221013-142450.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Cache the size of the manifest and tag request spans with it for more observability
time: 2022-10-13T14:24:50.302275-04:00
custom:
Author: jp-dbt
Issue: "104"
PR: "104"
6 changes: 5 additions & 1 deletion dbt_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ def startup_cache_initialize():
return

source_path = filesystem_service.get_root_path(latest_state_id)
LAST_PARSED.set_last_parsed_manifest(latest_state_id, manifest, source_path)
manifest_size = filesystem_service.get_size(manifest_path)
LAST_PARSED.set_last_parsed_manifest(
latest_state_id, manifest, source_path, manifest_size
)

logger.info(f"[STARTUP] Cached manifest in memory (state_id={latest_state_id})")


Expand Down
5 changes: 5 additions & 0 deletions dbt_server/services/filesystem_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ def get_path(state_id, *path_parts):
return os.path.join(get_root_path(state_id), *path_parts)


@tracer.wrap
def get_size(path):
return os.path.getsize(path)


@tracer.wrap
def ensure_dir_exists(path):
dirname = os.path.dirname(path)
Expand Down
40 changes: 31 additions & 9 deletions dbt_server/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
class CachedManifest:
state_id: Optional[str] = None
manifest: Optional[Any] = None
manifest_size: Optional[int] = None

config: Optional[Any] = None
parser: Optional[Any] = None

def set_last_parsed_manifest(self, state_id, manifest, project_path):
def set_last_parsed_manifest(self, state_id, manifest, project_path, manifest_size):
with MANIFEST_LOCK:
self.state_id = state_id
self.manifest = manifest
self.manifest_size = manifest_size

self.config = dbt_service.create_dbt_config(project_path)
self.parser = dbt_service.get_sql_parser(self.config, self.manifest)
Expand All @@ -41,6 +43,7 @@ def reset(self):
with MANIFEST_LOCK:
self.state_id = None
self.manifest = None
self.manifest_size = None
self.config = None
self.parser = None

Expand All @@ -49,22 +52,33 @@ def reset(self):


class StateController(object):
def __init__(self, state_id, manifest, config, parser):
def __init__(
self, state_id, manifest, config, parser, manifest_size, is_manifest_cached
):
self.state_id = state_id
self.manifest = manifest
self.config = config
self.parser = parser
self.manifest_size = manifest_size
self.is_manifest_cached = is_manifest_cached

self.root_path = filesystem_service.get_root_path(state_id)
self.serialize_path = filesystem_service.get_path(state_id, "manifest.msgpack")

@classmethod
@tracer.wrap
def from_parts(cls, state_id, manifest, source_path):
def from_parts(cls, state_id, manifest, source_path, manifest_size):
config = dbt_service.create_dbt_config(source_path)
parser = dbt_service.get_sql_parser(config, manifest)

return cls(state_id=state_id, manifest=manifest, config=config, parser=parser)
return cls(
state_id=state_id,
manifest=manifest,
config=config,
parser=parser,
manifest_size=manifest_size,
is_manifest_cached=False,
)

@classmethod
@tracer.wrap
Expand All @@ -74,6 +88,8 @@ def from_cached(cls, cached):
manifest=cached.manifest,
config=cached.config,
parser=cached.parser,
manifest_size=cached.manifest_size,
is_manifest_cached=True,
)

@classmethod
Expand All @@ -87,12 +103,9 @@ def parse_from_source(cls, state_id, parse_args):
source_path = filesystem_service.get_root_path(state_id)
logger.info(f"Parsing manifest from filetree (state_id={state_id})")
manifest = dbt_service.parse_to_manifest(source_path, parse_args)
# Every parse updates the in-memory manifest cache
logger.info(f"Updating cache (state_id={state_id})")
LAST_PARSED.set_last_parsed_manifest(state_id, manifest, source_path)

logger.info(f"Done parsing from source (state_id={state_id})")
return cls.from_cached(LAST_PARSED)
return cls.from_parts(state_id, manifest, source_path, 0)

@classmethod
@tracer.wrap
Expand Down Expand Up @@ -123,14 +136,16 @@ def load_state(cls, state_id):
manifest_path = filesystem_service.get_path(state_id, "manifest.msgpack")
logger.info(f"Loading manifest from file system ({manifest_path})")
manifest = dbt_service.deserialize_manifest(manifest_path)
manifest_size = filesystem_service.get_size(manifest_path)

source_path = filesystem_service.get_root_path(state_id)
return cls.from_parts(state_id, manifest, source_path)
return cls.from_parts(state_id, manifest, source_path, manifest_size)

@tracer.wrap
def serialize_manifest(self):
logger.info(f"Serializing manifest to file system ({self.serialize_path})")
dbt_service.serialize_manifest(self.manifest, self.serialize_path)
self.manifest_size = filesystem_service.get_size(self.serialize_path)

@tracer.wrap
def update_state_id(self):
Expand All @@ -149,3 +164,10 @@ def compile_query(self, query):
@tracer.wrap
def execute_query(self, query):
return dbt_service.execute_sql(self.manifest, self.root_path, query)

@tracer.wrap
def update_cache(self):
logger.info(f"Updating cache (state_id={self.state_id})")
LAST_PARSED.set_last_parsed_manifest(
self.state_id, self.manifest, self.root_path, self.manifest_size
)
10 changes: 10 additions & 0 deletions dbt_server/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,14 @@ def dd_trace(*args, **kwargs):
return no_op


def add_tags_to_current_span(tag_dict):
if not ENV_HAS_DDTRACE and not APM_ENABLED:
return

current_span = ddtrace.tracer.current_span()
if current_span:
for tag_name, tag_value in tag_dict.items():
current_span.set_tag(tag_name, tag_value)


setup_tracing()
20 changes: 20 additions & 0 deletions dbt_server/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from dbt_server.state import StateController
from dbt_server import crud, schemas, helpers
from dbt_server import tracer

from dbt_server.services import (
filesystem_service,
Expand Down Expand Up @@ -292,6 +293,9 @@ def parse_project(args: ParseArgs):
state = StateController.parse_from_source(args.state_id, args)
state.serialize_manifest()
state.update_state_id()
state.update_cache()

tracer.add_tags_to_current_span({"manifest_size": state.manifest_size})

return JSONResponse(
status_code=200,
Expand Down Expand Up @@ -407,6 +411,8 @@ async def preview_sql(sql: SQLConfig):
result = state.execute_query(sql.sql)
compiled_code = helpers.extract_compiled_code_from_node(result)

tag_request_span(state)

return JSONResponse(
status_code=200,
content={
Expand All @@ -424,6 +430,8 @@ def compile_sql(sql: SQLConfig):
result = state.compile_query(sql.sql)
compiled_code = helpers.extract_compiled_code_from_node(result)

tag_request_span(state)

return JSONResponse(
status_code=200,
content={
Expand All @@ -435,6 +443,18 @@ def compile_sql(sql: SQLConfig):
)


def tag_request_span(state):
manifest_metadata = get_manifest_metadata(state)
tracer.add_tags_to_current_span(manifest_metadata)


def get_manifest_metadata(state):
return {
"manifest_size": state.manifest_size,
"is_manifest_cached": state.is_manifest_cached,
}


class Task(BaseModel):
task_id: str

Expand Down
15 changes: 12 additions & 3 deletions tests/integration/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,34 @@ def tearDown(self):
"dbt_server.services.filesystem_service.get_latest_state_id",
return_value="abc123",
)
@patch("dbt_server.services.filesystem_service.get_size", return_value=1024)
@patch(
"dbt_server.services.dbt_service.deserialize_manifest",
return_value=fake_manifest,
)
@patch("dbt_server.services.dbt_service.create_dbt_config", return_value=None)
@patch("dbt_server.services.dbt_service.get_sql_parser", return_value=None)
def test_startup_cache_succeeds(
self, get_sql_parser, create_dbt_config, mock_dbt, mock_fs
self,
get_sql_parser,
create_dbt_config,
mock_dbt,
mock_fs_get_size,
mock_fs_get_latest_state_id,
):
# Make sure it's not errantly cached
assert LAST_PARSED.manifest is None

startup_cache_initialize()

# Make sure manifest is now cached
mock_fs.assert_called_once_with(None)
mock_dbt.assert_called_once_with("./working-dir/state-abc123/manifest.msgpack")
expected_path = "./working-dir/state-abc123/manifest.msgpack"
mock_fs_get_latest_state_id.assert_called_once_with(None)
mock_fs_get_size.assert_called_once_with(expected_path)
mock_dbt.assert_called_once_with(expected_path)
assert LAST_PARSED.manifest is fake_manifest
assert LAST_PARSED.state_id == "abc123"
assert LAST_PARSED.manifest_size == 1024

@patch(
"dbt_server.services.filesystem_service.get_latest_state_id", return_value=None
Expand Down
14 changes: 11 additions & 3 deletions tests/integration/test_compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ def test_compilation_interface_valid_state_id(self):

state_mock = Mock(
return_value=StateController(
state_id=state_id, manifest=None, config=None, parser=None
state_id=state_id,
manifest=None,
config=None,
parser=None,
manifest_size=0,
is_manifest_cached=False,
)
)

Expand Down Expand Up @@ -121,6 +126,7 @@ def test_compilation_interface_cache_mutation(self):
cached = CachedManifest()
assert cached.state_id is None
assert cached.manifest is None
assert cached.manifest_size is None

path = "working-dir/state-abc123/"

Expand All @@ -138,10 +144,11 @@ def test_compilation_interface_cache_mutation(self):
create_dbt_config=Mock(),
get_sql_parser=Mock(),
):
cached.set_last_parsed_manifest("abc123", manifest_mock, path)
cached.set_last_parsed_manifest("abc123", manifest_mock, path, 512)

assert cached.state_id == "abc123"
assert cached.manifest is not None
assert cached.manifest_size == 512
assert cached.config is not None
assert cached.parser is not None

Expand All @@ -161,9 +168,10 @@ def test_compilation_interface_cache_mutation(self):
create_dbt_config=Mock(),
get_sql_parser=Mock(),
):
cached.set_last_parsed_manifest("def456", new_manifest_mock, path)
cached.set_last_parsed_manifest("def456", new_manifest_mock, path, 1024)
assert cached.state_id == "def456"
assert cached.manifest is not None
assert cached.manifest_size == 1024
assert cached.config is not None
assert cached.parser is not None

Expand Down

0 comments on commit fec2854

Please sign in to comment.