Skip to content

Commit

Permalink
feat(ingest/looker): Do not emit usage for non-ingested dashboards an…
Browse files Browse the repository at this point in the history
…d charts (datahub-project#11647)
  • Loading branch information
asikowitz authored Dec 24, 2024
1 parent 047644b commit 09a9b6e
Show file tree
Hide file tree
Showing 5 changed files with 482 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,15 @@ class LookerDashboardSourceReport(StaleEntityRemovalSourceReport):
dashboards_with_activity: LossySet[str] = dataclasses_field(
default_factory=LossySet
)

# Entities that don't seem to exist, so we don't emit usage aspects for them despite having usage data
dashboards_skipped_for_usage: LossySet[str] = dataclasses_field(
default_factory=LossySet
)
charts_skipped_for_usage: LossySet[str] = dataclasses_field(
default_factory=LossySet
)

stage_latency: List[StageLatency] = dataclasses_field(default_factory=list)
_looker_explore_registry: Optional[LookerExploreRegistry] = None
total_explores: int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
ViewField,
ViewFieldType,
gen_model_key,
get_urn_looker_element_id,
)
from datahub.ingestion.source.looker.looker_config import LookerDashboardSourceConfig
from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI
Expand Down Expand Up @@ -165,6 +166,9 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext):
# Required, as we do not ingest all folders but only those that have dashboards/looks
self.processed_folders: List[str] = []

# Keep track of ingested chart urns, to omit usage for non-ingested entities
self.chart_urns: Set[str] = set()

@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
Expand Down Expand Up @@ -642,6 +646,7 @@ def _make_chart_metadata_events(
chart_urn = self._make_chart_urn(
element_id=dashboard_element.get_urn_element_id()
)
self.chart_urns.add(chart_urn)
chart_snapshot = ChartSnapshot(
urn=chart_urn,
aspects=[Status(removed=False)],
Expand Down Expand Up @@ -1380,7 +1385,9 @@ def _get_folder_and_ancestors_workunits(
yield from self._emit_folder_as_container(folder)

def extract_usage_stat(
self, looker_dashboards: List[looker_usage.LookerDashboardForUsage]
self,
looker_dashboards: List[looker_usage.LookerDashboardForUsage],
ingested_chart_urns: Set[str],
) -> List[MetadataChangeProposalWrapper]:
looks: List[looker_usage.LookerChartForUsage] = []
# filter out look from all dashboard
Expand All @@ -1391,6 +1398,15 @@ def extract_usage_stat(

# dedup looks
looks = list({str(look.id): look for look in looks}.values())
filtered_looks = []
for look in looks:
if not look.id:
continue
chart_urn = self._make_chart_urn(get_urn_looker_element_id(look.id))
if chart_urn in ingested_chart_urns:
filtered_looks.append(look)
else:
self.reporter.charts_skipped_for_usage.add(look.id)

# Keep stat generators to generate entity stat aspect later
stat_generator_config: looker_usage.StatGeneratorConfig = (
Expand All @@ -1414,7 +1430,7 @@ def extract_usage_stat(
stat_generator_config,
self.reporter,
self._make_chart_urn,
looks,
filtered_looks,
)

mcps: List[MetadataChangeProposalWrapper] = []
Expand Down Expand Up @@ -1669,7 +1685,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.source_config.extract_usage_history:
self.reporter.report_stage_start("usage_extraction")
usage_mcps: List[MetadataChangeProposalWrapper] = self.extract_usage_stat(
looker_dashboards_for_usage
looker_dashboards_for_usage, self.chart_urns
)
for usage_mcp in usage_mcps:
yield usage_mcp.as_workunit()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
TimeWindowSizeClass,
_Aspect as AspectAbstract,
)
from datahub.utilities.lossy_collections import LossySet

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -170,7 +171,7 @@ def __init__(
self.config = config
self.looker_models = looker_models
# Later it will help to find out for what are the looker entities from query result
self.id_vs_model: Dict[str, ModelForUsage] = {
self.id_to_model: Dict[str, ModelForUsage] = {
self.get_id(looker_object): looker_object for looker_object in looker_models
}
self.post_filter = len(self.looker_models) > 100
Expand Down Expand Up @@ -225,6 +226,10 @@ def get_id(self, looker_object: ModelForUsage) -> str:
def get_id_from_row(self, row: dict) -> str:
pass

@abstractmethod
def report_skip_set(self) -> LossySet[str]:
pass

def create_mcp(
self, model: ModelForUsage, aspect: Aspect
) -> MetadataChangeProposalWrapper:
Expand Down Expand Up @@ -258,20 +263,11 @@ def _process_entity_timeseries_rows(

return entity_stat_aspect

def _process_absolute_aspect(self) -> List[Tuple[ModelForUsage, AspectAbstract]]:
aspects: List[Tuple[ModelForUsage, AspectAbstract]] = []
for looker_object in self.looker_models:
aspects.append(
(looker_object, self.to_entity_absolute_stat_aspect(looker_object))
)

return aspects

def _fill_user_stat_aspect(
self,
entity_usage_stat: Dict[Tuple[str, str], Aspect],
user_wise_rows: List[Dict],
) -> Iterable[Tuple[ModelForUsage, Aspect]]:
) -> Iterable[Tuple[str, Aspect]]:
logger.debug("Entering fill user stat aspect")

# We first resolve all the users using a threadpool to warm up the cache
Expand Down Expand Up @@ -300,7 +296,7 @@ def _fill_user_stat_aspect(

for row in user_wise_rows:
# Confirm looker object was given for stat generation
looker_object = self.id_vs_model.get(self.get_id_from_row(row))
looker_object = self.id_to_model.get(self.get_id_from_row(row))
if looker_object is None:
logger.warning(
"Looker object with id({}) was not register with stat generator".format(
Expand Down Expand Up @@ -338,7 +334,7 @@ def _fill_user_stat_aspect(
logger.debug("Starting to yield answers for user-wise counts")

for (id, _), aspect in entity_usage_stat.items():
yield self.id_vs_model[id], aspect
yield id, aspect

def _execute_query(self, query: LookerQuery, query_name: str) -> List[Dict]:
rows = []
Expand All @@ -357,7 +353,7 @@ def _execute_query(self, query: LookerQuery, query_name: str) -> List[Dict]:
)
if self.post_filter:
logger.debug("post filtering")
rows = [r for r in rows if self.get_id_from_row(r) in self.id_vs_model]
rows = [r for r in rows if self.get_id_from_row(r) in self.id_to_model]
logger.debug("Filtered down to %d rows", len(rows))
except Exception as e:
logger.warning(f"Failed to execute {query_name} query: {e}")
Expand All @@ -378,7 +374,8 @@ def generate_usage_stat_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
return

# yield absolute stat for looker entities
for looker_object, aspect in self._process_absolute_aspect(): # type: ignore
for looker_object in self.looker_models:
aspect = self.to_entity_absolute_stat_aspect(looker_object)
yield self.create_mcp(looker_object, aspect)

# Execute query and process the raw json which contains stat information
Expand All @@ -399,10 +396,13 @@ def generate_usage_stat_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
)
user_wise_rows = self._execute_query(user_wise_query_with_filters, "user_query")
# yield absolute stat for entity
for looker_object, aspect in self._fill_user_stat_aspect(
for object_id, aspect in self._fill_user_stat_aspect(
entity_usage_stat, user_wise_rows
):
yield self.create_mcp(looker_object, aspect)
if object_id in self.id_to_model:
yield self.create_mcp(self.id_to_model[object_id], aspect)
else:
self.report_skip_set().add(object_id)


class DashboardStatGenerator(BaseStatGenerator):
Expand All @@ -425,6 +425,9 @@ def __init__(
def get_stats_generator_name(self) -> str:
return "DashboardStats"

def report_skip_set(self) -> LossySet[str]:
return self.report.dashboards_skipped_for_usage

def get_filter(self) -> Dict[ViewField, str]:
return {
HistoryViewField.HISTORY_DASHBOARD_ID: ",".join(
Expand Down Expand Up @@ -541,6 +544,9 @@ def __init__(
def get_stats_generator_name(self) -> str:
return "ChartStats"

def report_skip_set(self) -> LossySet[str]:
return self.report.charts_skipped_for_usage

def get_filter(self) -> Dict[ViewField, str]:
return {
LookViewField.LOOK_ID: ",".join(
Expand Down
Loading

0 comments on commit 09a9b6e

Please sign in to comment.