diff --git a/timesketch/lib/analyzers/interface.py b/timesketch/lib/analyzers/interface.py index 2480673972..5629a17d9c 100644 --- a/timesketch/lib/analyzers/interface.py +++ b/timesketch/lib/analyzers/interface.py @@ -22,10 +22,13 @@ import random import time import traceback + + import yaml import opensearchpy from flask import current_app +from jsonschema import validate, ValidationError, SchemaError import pandas @@ -884,6 +887,18 @@ def __init__(self, index_name, sketch_id, timeline_id=None): port=current_app.config["OPENSEARCH_PORT"], ) + # Add AnalyzerOutput instance and set all attributes that can be set + # automatically + self.output = AnalyzerOutput( + analyzer_identifier=self.NAME, + analyzer_name=self.DISPLAY_NAME, + timesketch_instance=current_app.config.get( + "EXTERNAL_HOST_URL", "https://localhost" + ), + sketch_id=sketch_id, + timeline_id=timeline_id, + ) + if not hasattr(self, "sketch"): self.sketch = None @@ -1150,3 +1165,207 @@ def get_kwargs(cls): def run(self): """Entry point for the analyzer.""" raise NotImplementedError + + +class AnalyzerOutputException(Exception): + """Analyzer output exception.""" + + +class AnalyzerOutput: + """A class to record timesketch analyzer output. + + Attributes: + platform (str): [Required] Analyzer platfrom. + analyzer_identifier (str): [Required] Unique analyzer identifier. + analyzer_name (str): [Required] Analyzer display name. + result_status (str): [Required] Analyzer result status. + Valid values are success or error. + result_priority (str): [Required] Priority of the result based on the + analysis findings. Valid values are NOTE (default), LOW, MEDIUM, HIGH. + result_summary (str): [Required] A summary statement of the analyzer + finding. A result summary must exist even if there is no finding. + result_markdown (str): [Optional] A detailed information about the + analyzer finding in a markdown format. + references (List[str]): [Optional] A list of references about the + analyzer or the issue the analyzer attempts to address. + result_attributes (dict): [Optional] A dict of key : value pairs that + holds additional finding details. + platform_meta_data: (dict): [Required] A dict of key : value pairs that + holds the following information: + timesketch_instance (str): [Required] The Timesketch instance URL. + sketch_id (int): [Required] Timesketch sketch ID for this analyzer. + timeline_id (int): [Required] Timesketch timeline ID for this analyzer. + saved_views (List[int]): [Optional] Views generatred by the analyzer. + saved_stories (List[int]): [Optional] Stories generated by the analyzer. + saved_graphs (List[int]): [Optional] Graphs generated by the analyzer. + saved_aggregations (List[int]): [Optional] Aggregations generated + by the analyzer. + created_tags (List[str]): [Optional] Tags created by the analyzer. + """ + + def __init__( + self, + analyzer_identifier, + analyzer_name, + timesketch_instance, + sketch_id, + timeline_id, + ): + """Initialize analyzer output.""" + self.platform = "timesketch" + self.analyzer_identifier = analyzer_identifier + self.analyzer_name = analyzer_name + self.result_status = "" # TODO: link to analyzer status/error? + self.result_priority = "NOTE" + self.result_summary = "" + self.result_markdown = "" + self.references = [] + self.result_attributes = {} + self.platform_meta_data = { + "timesketch_instance": timesketch_instance, + "sketch_id": sketch_id, + "timeline_id": timeline_id, + "saved_views": [], + "saved_stories": [], + "saved_graphs": [], + "saved_aggregations": [], + "created_tags": [], + } + + def validate(self): + """Validates the analyzer output and raises exception.""" + schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "platform": {"type": "string", "enum": ["timesketch"]}, + "analyzer_identifier": {"type": "string", "minLength": 1}, + "analyzer_name": {"type": "string", "minLength": 1}, + "result_status": { + "type": "string", + "enum": ["SUCCESS", "NO-FINDINGS", "ERROR"], + }, + "result_priority": { + "type": "string", + "default": "NOTE", + "enum": ["HIGH", "MEDIUM", "LOW", "NOTE"], + }, + "result_summary": {"type": "string", "minLength": 1}, + "result_markdown": {"type": "string", "minLength": 1}, + "references": { + "type": "array", + "items": [{"type": "string", "minLength": 1}], + }, + "result_attributes": {"type": "object"}, + "platform_meta_data": { + "type": "object", + "properties": { + "timesketch_instance": {"type": "string", "minLength": 1}, + "sketch_id": {"type": "integer"}, + "timeline_id": {"type": "integer"}, + "saved_views": { + "type": "array", + "items": [ + {"type": "integer"}, + ], + }, + "saved_stories": { + "type": "array", + "items": [{"type": "integer"}], + }, + "saved_aggregations": { + "type": "array", + "items": [ + {"type": "integer"}, + ], + }, + "created_tags": { + "type": "array", + "items": [ + {"type": "string"}, + ], + }, + }, + "required": [ + "timesketch_instance", + "sketch_id", + "timeline_id", + ], + }, + }, + "required": [ + "platform", + "analyzer_identifier", + "analyzer_name", + "result_status", + "result_priority", + "result_summary", + "platform_meta_data", + ], + } + + try: + validate(instance=self.to_json(), schema=schema) + return True + except (ValidationError, SchemaError) as e: + raise AnalyzerOutputException(f"json schema error: {e}") from e + + def to_json(self) -> dict: + """Returns JSON output of AnalyzerOutput. Filters out empty values.""" + # add required fields + output = { + "platform": self.platform, + "analyzer_identifier": self.analyzer_identifier, + "analyzer_name": self.analyzer_name, + "result_status": self.result_status.upper(), + "result_priority": self.result_priority.upper(), + "result_summary": self.result_summary, + "platform_meta_data": { + "timesketch_instance": self.platform_meta_data["timesketch_instance"], + "sketch_id": self.platform_meta_data["sketch_id"], + "timeline_id": self.platform_meta_data["timeline_id"], + }, + } + + # add optional fields if they are not empty + if self.result_markdown and self.result_markdown != "": + output["result_markdown"] = self.result_markdown + + if self.references: + output["references"] = self.references + + if self.result_attributes: + output["result_attributes"] = self.result_attributes + + if self.platform_meta_data["saved_views"]: + output["platform_meta_data"]["saved_views"] = self.platform_meta_data[ + "saved_views" + ] + + if self.platform_meta_data["saved_stories"]: + output["platform_meta_data"]["saved_stories"] = self.platform_meta_data[ + "saved_stories" + ] + + if self.platform_meta_data["saved_graphs"]: + output["platform_meta_data"]["saved_graphs"] = self.platform_meta_data[ + "saved_graphs" + ] + + if self.platform_meta_data["saved_aggregations"]: + output["platform_meta_data"][ + "saved_aggregations" + ] = self.platform_meta_data["saved_aggregations"] + + if self.platform_meta_data["created_tags"]: + output["platform_meta_data"]["created_tags"] = self.platform_meta_data[ + "created_tags" + ] + + return output + + def __str__(self) -> str: + """Returns string output of AnalyzerOutput.""" + if self.validate(): + return json.dumps(self.to_json()) + return ""