diff --git a/dbt_common/events/cookie.py b/dbt_common/events/cookie.py new file mode 100644 index 0000000..fb659c4 --- /dev/null +++ b/dbt_common/events/cookie.py @@ -0,0 +1,32 @@ +from pathlib import Path +import uuid +from typing import Any, Dict + +import yaml + +# the C version is faster, but it doesn't always exist +try: + from yaml import CSafeLoader as SafeLoader +except ImportError: + from yaml import SafeLoader + + +class Cookie: + def __init__(self, directory: Path) -> None: + self.id: str = str(uuid.uuid4()) + self.path: Path = directory / ".user.yml" + self.save() + + def as_dict(self) -> Dict[str, Any]: + return {"id": self.id} + + def save(self) -> None: + with open(self.path, "w") as fh: + yaml.dump(self.as_dict(), fh) + + def load(self) -> Dict[str, Any]: + with open(self.path, "r") as fh: + try: + return yaml.load(fh, Loader=SafeLoader) + except yaml.reader.ReaderError: + return {} diff --git a/dbt_common/events/event_manager.py b/dbt_common/events/event_manager.py index 507588f..e647c9e 100644 --- a/dbt_common/events/event_manager.py +++ b/dbt_common/events/event_manager.py @@ -4,11 +4,13 @@ from dbt_common.events.base_types import BaseEvent, EventLevel, msg_from_base_event, TCallback from dbt_common.events.logger import LoggerConfig, _Logger, _TextLogger, _JsonLogger, LineFormat +from dbt_common.events.tracker import TrackerConfig, _Tracker class EventManager: def __init__(self) -> None: self.loggers: List[_Logger] = [] + self.trackers: List[_Tracker] = [] self.callbacks: List[TCallback] = [] def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: @@ -37,6 +39,9 @@ def add_logger(self, config: LoggerConfig) -> None: ) self.loggers.append(logger) + def add_tracker(self, config: TrackerConfig) -> None: + self.trackers.append(_Tracker(config)) + def add_callback(self, callback: TCallback) -> None: self.callbacks.append(callback) @@ -48,6 +53,7 @@ def flush(self) -> None: class IEventManager(Protocol): callbacks: List[TCallback] loggers: List[_Logger] + trackers: List[_Tracker] def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: ... @@ -55,6 +61,9 @@ def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: def add_logger(self, config: LoggerConfig) -> None: ... + def add_tracker(self, config: TrackerConfig) -> None: + ... + def add_callback(self, callback: TCallback) -> None: ... diff --git a/dbt_common/events/event_manager_client.py b/dbt_common/events/event_manager_client.py index 538d319..c617d0f 100644 --- a/dbt_common/events/event_manager_client.py +++ b/dbt_common/events/event_manager_client.py @@ -17,6 +17,11 @@ def add_logger_to_manager(logger) -> None: _EVENT_MANAGER.add_logger(logger) +def add_tracker_to_manager(tracker) -> None: + global _EVENT_MANAGER + _EVENT_MANAGER.add_tracker(tracker) + + def add_callback_to_manager(callback: TCallback) -> None: global _EVENT_MANAGER _EVENT_MANAGER.add_callback(callback) @@ -32,4 +37,5 @@ def cleanup_event_logger() -> None: # especially important for tests, since pytest replaces the stdout stream # during test runs, and closes the stream after the test is over. _EVENT_MANAGER.loggers.clear() + _EVENT_MANAGER.trackers.clear() _EVENT_MANAGER.callbacks.clear() diff --git a/dbt_common/events/functions.py b/dbt_common/events/functions.py index 4e055aa..f7984dc 100644 --- a/dbt_common/events/functions.py +++ b/dbt_common/events/functions.py @@ -1,20 +1,26 @@ -from pathlib import Path - -from dbt_common.events.event_manager_client import get_event_manager -from dbt_common.exceptions import EventCompilationError -from dbt_common.invocation import get_invocation_id -from dbt_common.helper_types import WarnErrorOptions -from dbt_common.utils.encoding import ForgivingJSONEncoder -from dbt_common.events.base_types import BaseEvent, EventLevel, EventMsg -from dbt_common.events.logger import LoggerConfig, LineFormat -from dbt_common.exceptions import scrub_secrets, env_secrets -from dbt_common.events.types import Note from functools import partial import json import os +from pathlib import Path import sys -from typing import Callable, Dict, Optional, TextIO, Union +from typing import Any, Callable, Dict, Optional, TextIO, Union + from google.protobuf.json_format import MessageToDict +from snowplow_tracker import Subject +from snowplow_tracker.typing import FailureCallback + +from dbt_common.helper_types import WarnErrorOptions +from dbt_common.invocation import get_invocation_id +from dbt_common.events.base_types import BaseEvent, EventLevel, EventMsg +from dbt_common.events.cookie import Cookie +from dbt_common.events.event_manager_client import get_event_manager +from dbt_common.events.logger import LoggerConfig, LineFormat +from dbt_common.events.tracker import TrackerConfig +from dbt_common.events.types import DisableTracking, Note +from dbt_common.events.user import User +from dbt_common.exceptions import EventCompilationError, scrub_secrets, env_secrets +from dbt_common.utils.encoding import ForgivingJSONEncoder + LOG_VERSION = 3 metadata_vars: Optional[Dict[str, str]] = None @@ -22,6 +28,7 @@ WARN_ERROR_OPTIONS = WarnErrorOptions(include=[], exclude=[]) WARN_ERROR = False + # This global, and the following two functions for capturing stdout logs are # an unpleasant hack we intend to remove as part of API-ification. The GitHub # issue #6350 was opened for that work. @@ -58,6 +65,24 @@ def get_stdout_config( ) +def get_logfile_config( + name: str, + log_path: str, + line_format: Optional[LineFormat] = LineFormat.PlainText, + use_colors: Optional[bool] = False, + log_file_max_bytes: Optional[int] = 10 * 1024 * 1024, +) -> LoggerConfig: + return LoggerConfig( + name=name, + line_format=line_format, + level=EventLevel.DEBUG, # File log is *always* debug level + use_colors=use_colors, + invocation_id=get_invocation_id(), + output_file_name=log_path, + output_file_max_bytes=log_file_max_bytes, + ) + + def make_log_dir_if_missing(log_path: Union[Path, str]) -> None: if isinstance(log_path, str): log_path = Path(log_path) @@ -153,3 +178,65 @@ def get_metadata_vars() -> Dict[str, str]: def reset_metadata_vars() -> None: global metadata_vars metadata_vars = None + + +def _default_on_failure(num_ok, unsent): + """ + num_ok will always be 0, unsent will always be 1 entry long + because the buffer is length 1, so not much to talk about + + TODO: add `disable_tracking` as a callback on `DisableTracking` + """ + fire_event(DisableTracking()) + + +def snowplow_config( + user: User, + endpoint: str, + protocol: Optional[str] = "https", + on_failure: Optional[FailureCallback] = _default_on_failure, +) -> TrackerConfig: + return TrackerConfig( + invocation_id=user.invocation_id, + endpoint=endpoint, + protocol=protocol, + on_failure=on_failure, + ) + + +def enable_tracking(tracker, user: User): + cookie = _get_cookie(user) + user.enable_tracking(cookie) + + subject = Subject() + subject.set_user_id(cookie.get("id")) + tracker.set_subject(subject) + + +def disable_tracking(tracker, user: User): + user.disable_tracking() + tracker.set_subject(None) + + +def _get_cookie(user: User) -> Dict[str, Any]: + if cookie := user.cookie: + return cookie + return _set_cookie(user) + + +def _set_cookie(user: User) -> Dict[str, Any]: + """ + If the user points dbt to a profile directory which exists AND + contains a profiles.yml file, then we can set a cookie. If the + specified folder does not exist, or if there is not a profiles.yml + file in this folder, then an inconsistent cookie can be used. This + will change in every dbt invocation until the user points to a + profile dir file which contains a valid profiles.yml file. + + See: https://github.com/dbt-labs/dbt-core/issues/1645 + """ + if user.profile.exists(): + cookie = Cookie(user.directory) + user.cookie = cookie.as_dict() + return user.cookie + return {} diff --git a/dbt_common/events/tracker.py b/dbt_common/events/tracker.py new file mode 100644 index 0000000..43b700a --- /dev/null +++ b/dbt_common/events/tracker.py @@ -0,0 +1,71 @@ +from dataclasses import dataclass +import logging +from logging.handlers import RotatingFileHandler +from typing import Optional + +from snowplow_tracker import Emitter, Tracker +from snowplow_tracker.typing import FailureCallback + +from dbt_common.events.base_types import EventMsg + + +@dataclass +class TrackerConfig: + invocation_id: Optional[str] = None + endpoint: Optional[str] = None + protocol: Optional[str] = None + on_failure: Optional[FailureCallback] = None + name: Optional[str] = None + output_file_name: Optional[str] = None + output_file_max_bytes: Optional[int] = 10 * 1024 * 1024 # 10 mb + + +class _Tracker: + def __init__(self, config: TrackerConfig) -> None: + self.invocation_id: Optional[str] = config.invocation_id + + if all([config.name, config.output_file_name]): + file_handler = RotatingFileHandler( + filename=str(config.output_file_name), + encoding="utf8", + maxBytes=config.output_file_max_bytes, # type: ignore + backupCount=5, + ) + self._tracker = self._python_file_logger(config.name, file_handler) + + elif all([config.endpoint, config.protocol]): + self._tracker = self._snowplow_tracker(config.endpoint, config.protocol) + + def track(self, msg: EventMsg) -> str: + raise NotImplementedError() + + def _python_file_logger(self, name: str, handler: logging.Handler) -> logging.Logger: + log = logging.getLogger(name) + log.setLevel(logging.DEBUG) + handler.setFormatter(logging.Formatter(fmt="%(message)s")) + log.handlers.clear() + log.propagate = False + log.addHandler(handler) + return log + + def _snowplow_tracker( + self, + endpoint: str, + protocol: Optional[str] = "https", + on_failure: Optional[FailureCallback] = None, + ) -> Tracker: + emitter = Emitter( + endpoint, + protocol, + method="post", + batch_size=30, + on_failure=on_failure, + byte_limit=None, + request_timeout=5.0, + ) + tracker = Tracker( + emitters=emitter, + namespace="cf", + app_id="dbt", + ) + return tracker diff --git a/dbt_common/events/types.proto b/dbt_common/events/types.proto index d72d6b2..8352ed0 100644 --- a/dbt_common/events/types.proto +++ b/dbt_common/events/types.proto @@ -126,6 +126,15 @@ message FormattingMsg { Formatting data = 2; } +// Z039 +message DisableTracking { +} + +message DisableTrackingMsg { + CoreEventInfo info = 1; + DisableTracking data = 2; +} + // Z050 message Note { string msg = 1; diff --git a/dbt_common/events/types.py b/dbt_common/events/types.py index 02fc3ee..16a55b5 100644 --- a/dbt_common/events/types.py +++ b/dbt_common/events/types.py @@ -151,6 +151,18 @@ def message(self) -> str: return self.msg +class DisableTracking(DebugLevel): + def code(self) -> str: + return "Z039" + + def message(self) -> str: + return ( + "Error sending anonymous usage statistics. Disabling tracking for this execution. " + "If you wish to permanently disable tracking, see: " + "https://docs.getdbt.com/reference/global-configs#send-anonymous-usage-stats." + ) + + class Note(InfoLevel): """Unstructured events. diff --git a/dbt_common/events/user.py b/dbt_common/events/user.py new file mode 100644 index 0000000..c7ee29a --- /dev/null +++ b/dbt_common/events/user.py @@ -0,0 +1,37 @@ +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional, Union + +import pytz + +from dbt_common.events.functions import get_invocation_id + + +class User: + def __init__(self, directory: Union[str, Path]) -> None: + self.cookie: Dict[str, Any] = {} + self.directory: Path = Path(directory) + self.invocation_id: str = get_invocation_id() + self.run_started_at: datetime = datetime.now(tz=pytz.utc) + + @property + def id(self) -> Optional[str]: + if self.cookie: + return self.cookie.get("id") + + @property + def do_not_track(self) -> bool: + return self.cookie != {} + + def state(self): + return "do not track" if self.do_not_track else "tracking" + + @property + def profile(self) -> Path: + return Path(self.directory) / "profiles.yml" + + def enable_tracking(self, cookie: Dict[str, Any]): + self.cookie = cookie + + def disable_tracking(self): + self.cookie = {}