Skip to content

Commit

Permalink
[resotocore][feat] Maintain history of nodes and make it available vi…
Browse files Browse the repository at this point in the history
…a CLI/API (#1289)

* implement history search and make it available via endpoint
  • Loading branch information
aquamatthias authored Nov 18, 2022
1 parent abde862 commit c9fb971
Show file tree
Hide file tree
Showing 19 changed files with 569 additions and 82 deletions.
3 changes: 2 additions & 1 deletion resotocore/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ disable=
duplicate-code, # pylint also checks duplicated import statements - does not make any sense
too-many-boolean-expressions,
no-member,
unsupported-binary-operation
unsupported-binary-operation,
too-many-branches


[REPORTS]
Expand Down
1 change: 1 addition & 0 deletions resotocore/resotocore/analytics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class CoreEvent:
GraphDBWiped = "graphdb.wiped"
CLICommand = "cli.command"
Query = "graphdb.query"
HistoryQuery = "graphdb.query.history"
ModelInfo = "model.info"
SubscriberInfo = "subscriber.info"
WorkerQueueInfo = "worker-queue.info"
Expand Down
12 changes: 11 additions & 1 deletion resotocore/resotocore/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
SortPart,
LimitPart,
WorkerCustomCommand,
HistoryPart,
)
from resotocore.cli.model import (
ParsedCommand,
Expand Down Expand Up @@ -204,6 +205,8 @@ def help_command() -> Stream:
CLIArg = Tuple[CLICommand, Optional[str]]
# If no sort is defined in the part, we use this default sort order
DefaultSort = [Sort("/reported.kind"), Sort("/reported.name"), Sort("/reported.id")]
# Default sort order for history searches
HistorySort = [Sort("/changed_at"), Sort("/reported.kind"), Sort("/reported.name"), Sort("/reported.id")]


class CLI:
Expand Down Expand Up @@ -309,6 +312,8 @@ async def parse_query(query_arg: str) -> Query:
nonlocal parsed_options
parsed, query_part = ExecuteSearchCommand.parse_known(query_arg)
parsed_options = {**parsed_options, **parsed}
# empty string is interpreted as no filter
query_part = "all" if query_part.strip() == "" else query_part
# section expansion is disabled here: it will happen on the final query after all parts have been combined
return await self.dependencies.template_expander.parse_query(
"".join(query_part), None, omit_section_expansion=True, **ctx.env
Expand All @@ -324,6 +329,11 @@ async def parse_query(query_arg: str) -> Query:
arg = command.arg if command.arg else ""
if isinstance(part, SearchPart):
query = query.combine(await parse_query(arg))
elif isinstance(part, HistoryPart):
parsed_options["history"] = True
query = query.combine(await parse_query(arg))
if not query.current_part.sort and not query.aggregate:
query = query.set_sort(*HistorySort)
elif isinstance(part, SortPart):
if query.current_part.sort == DefaultSort:
query = query.set_sort(*sort_args_p.parse(arg))
Expand Down Expand Up @@ -406,7 +416,7 @@ async def parse_query(query_arg: str) -> Query:
final_query = with_sort.on_section(ctx.env.get("section", PathRoot))
options = ExecuteSearchCommand.argument_string(parsed_options)
query_string = str(final_query)
execute_search = self.command("execute_search", options + query_string, ctx)
execute_search = self.command("execute_search", f"{options}'{query_string}'", ctx)
return final_query, parsed_options, [execute_search, *additional_commands]

async def evaluate_cli_command(
Expand Down
118 changes: 104 additions & 14 deletions resotocore/resotocore/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
from resotocore.cli.tip_of_the_day import SuggestionPolicy, SuggestionStrategy, get_suggestion_strategy
from resotocore.config import ConfigEntity
from resotocore.db.async_arangodb import AsyncCursor
from resotocore.db.graphdb import HistoryChange
from resotocore.db.model import QueryModel
from resotocore.db.runningtaskdb import RunningTaskData
from resotocore.dependencies import system_info
Expand Down Expand Up @@ -121,6 +122,7 @@
rnd_str,
set_value_in_path,
restart_service,
parse_utc,
)
from resotocore.web.content_renderer import (
respond_ndjson,
Expand Down Expand Up @@ -367,6 +369,67 @@ def args_info(self) -> ArgsInfo:
]


class HistoryPart(SearchCLIPart):
"""
```shell
history [--before <timestamp>] [--after <timestamp>] [--change <change>] [search-statement]
```
Return all changes of the graph based on the given criteria.
Whenever changes are given to Resoto, a dedicated change event is written as separate entity.
Following changes are supported:
- node_created: a node is added to the graph that has not been seen before.
- node_updated: a node is delivered and is different to the one in the graph.
- node_deleted: a node is no longer reported and gets deleted from the graph.
## Options
- `--before` <timestamp>: only show changes before this timestamp
- `--after` <timestamp>: only show changes after this timestamp
- `--change` <change>: one of `node_created`, `node_deleted`, `node_updated`
## Parameters
- `search-statement`: a search statement to filter the history
## Examples
```shell
# Show all nodes changed on 1.1.2022 between 03:00 and 06:00 (UTC)
> history --after 2022-01-01T03:00:00Z --before 2022-01-02T06:00:00Z
change=node_updated, changed_at=2022-01-01T03:00:59Z, kind=kubernetes_config_map, id=73616434 name=leader, cloud=k8s
change=node_deleted, changed_at=2022-01-01T04:40:59Z, kind=aws_vpc, id=vpc-1, name=resoto-eks, cloud=aws
# Show all nodes created on 1.1.2022 between 03:00 and 06:00 (UTC)
> history --change node_created --after 2022-01-01T03:00:00Z --before 2022-01-02T06:00:00Z
change=node_created, changed_at=2022-01-01T05:40:59Z, kind=aws_iam_role, id=AROA, name=some-role, cloud=aws
# Show all changes to kubernetes resources in the kube-system namespace
> history is(kubernetes_resource) and namespace=kube-system
change=node_created, changed_at=2022-11-18T12:00:49Z, kind=kubernetes_role, name=eks, namespace=kube-system
change=node_updated, changed_at=2022-11-18T12:00:50Z, kind=kubernetes_config_map, name=cert, namespace=kube-system
```
"""

@property
def name(self) -> str:
return "history"

def info(self) -> str:
return "Search the history of nodes."

def args_info(self) -> ArgsInfo:
return [
ArgInfo("--after", help_text="changes after this timestamp", expects_value=True, value_hint="timestamp"),
ArgInfo("--before", help_text="changes before this timestamp", expects_value=True, value_hint="timestamp"),
ArgInfo(
"--change",
help_text="type of change",
expects_value=True,
possible_values=[e.value for e in list(HistoryChange)],
),
ArgInfo(expects_value=True, value_hint="search"),
]


class SortPart(SearchCLIPart):
"""
```shell
Expand Down Expand Up @@ -1163,15 +1226,22 @@ def parse_known(arg: str) -> Tuple[Dict[str, Any], str]:
parser = NoExitArgumentParser()
parser.add_argument("--with-edges", dest="with-edges", default=None, action="store_true")
parser.add_argument("--explain", dest="explain", default=None, action="store_true")
parsed, rest = parser.parse_known_args(arg.split(maxsplit=2))
parser.add_argument("--history", dest="history", default=None, action="store_true")
parser.add_argument("--after", dest="after", default=None)
parser.add_argument("--before", dest="before", default=None)
parser.add_argument("--change", dest="change", default=None)
parsed, rest = parser.parse_known_args(list(args_parts_parser.parse(arg)))
return {k: v for k, v in vars(parsed).items() if v is not None}, " ".join(rest)

@staticmethod
def argument_string(args: Dict[str, Any]) -> str:
result = []
for key, value in args.items():
if value is True:
result.append(f"--{key}")
if value is None or value is False:
continue
result.append(f"--{key}")
if value is not True:
result.append(f"'{value}'") # put the value into single quotes to maintain the spaces
return " ".join(result) + " " if result else ""

def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwargs: Any) -> CLISource:
Expand All @@ -1184,6 +1254,7 @@ def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwa
parsed, rest = self.parse_known(arg)
with_edges: bool = parsed.get("with-edges", False)
explain: bool = parsed.get("explain", False)
history: bool = parsed.get("history", False)

# all templates are expanded at this point, so we can call the parser directly.
query = parse_query(rest, **ctx.env)
Expand All @@ -1204,15 +1275,17 @@ async def prepare() -> Tuple[Optional[int], AsyncIterator[Json]]:
query_model = await load_query_model()
count = ctx.env.get("count", "true").lower() != "false"
timeout = if_set(ctx.env.get("search_timeout"), duration)
context = (
await db.search_aggregation(query_model)
if query.aggregate
else (
await db.search_graph_gen(query_model, with_count=count, timeout=timeout)
if with_edges
else await db.search_list(query_model, with_count=count, timeout=timeout)
)
)
if history:
before = if_set(parsed.get("before"), lambda x: parse_utc(strip_quotes(x))) # type: ignore
after = if_set(parsed.get("after"), lambda x: parse_utc(strip_quotes(x))) # type: ignore
change = if_set(parsed.get("change"), lambda x: HistoryChange[strip_quotes(x)]) # type: ignore
context = await db.search_history(query_model, change, before, after, timeout=timeout)
elif query.aggregate:
context = await db.search_aggregation(query_model)
elif with_edges:
context = await db.search_graph_gen(query_model, with_count=count, timeout=timeout)
else:
context = await db.search_list(query_model, with_count=count, timeout=timeout)
cursor = context.cursor

# since we can not use context boundaries here,
Expand Down Expand Up @@ -2221,15 +2294,27 @@ class ListCommand(CLICommand, OutputTransformer):
(["reported", "id"], "id"),
(["reported", "name"], "name"),
]
default_context_properties_to_show = [
default_live_properties_to_show = [
(["reported", "age"], "age"),
(["reported", "last_update"], "last_update"),
]
default_context_properties_to_show = [
(["ancestors", "cloud", "reported", "name"], "cloud"),
(["ancestors", "account", "reported", "name"], "account"),
(["ancestors", "region", "reported", "name"], "region"),
(["ancestors", "zone", "reported", "name"], "zone"),
]
all_default_props = {".".join(path) for path, _ in default_properties_to_show + default_context_properties_to_show}
default_history_properties_to_show = [
(["change"], "change"),
(["changed_at"], "changed_at"),
]
all_default_props = {
".".join(path)
for path, _ in default_properties_to_show
+ default_context_properties_to_show
+ default_history_properties_to_show
+ default_live_properties_to_show
}
dot_re = re.compile("[.]")

@property
Expand Down Expand Up @@ -2264,6 +2349,8 @@ def default_props_to_show() -> List[Tuple[List[str], str]]:
# with the object id, if edges are requested
if ctx.query_options.get("with-edges") is True:
result.append((["id"], "node_id"))
if ctx.query_options.get("history") is True:
result.extend(self.default_history_properties_to_show)
# add all default props
result.extend(self.default_properties_to_show)
# add all predicates the user has queried
Expand All @@ -2276,6 +2363,8 @@ def default_props_to_show() -> List[Tuple[List[str], str]]:
if name not in self.all_default_props and name not in local_paths:
local_paths.add(name)
result.append((self.dot_re.split(name), name.rsplit(".", 1)[-1]))
if ctx.query_options.get("history") is not True:
result.extend(self.default_live_properties_to_show)
# add all context properties
result.extend(self.default_context_properties_to_show)
return result
Expand Down Expand Up @@ -4431,6 +4520,7 @@ def all_commands(d: CLIDependencies) -> List[CLICommand]:
FlattenCommand(d, "misc"),
FormatCommand(d, "format"),
HeadCommand(d, "misc"),
HistoryPart(d, "search", allowed_in_source_position=True),
HttpCommand(d, "action"),
JobsCommand(d, "action", allowed_in_source_position=True),
JqCommand(d, "misc"),
Expand Down
4 changes: 4 additions & 0 deletions resotocore/resotocore/core_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ class GraphUpdateConfig(ConfigObject):
default=4 * 3600,
metadata={"description": "If a graph update takes longer than this duration, the update is aborted."},
)
keep_history: bool = field(
default=True,
metadata={"description": "If true, changes of the graph are stored and are available via history."},
)

def merge_max_wait_time(self) -> timedelta:
return timedelta(seconds=self.merge_max_wait_time_seconds)
Expand Down
7 changes: 5 additions & 2 deletions resotocore/resotocore/db/arango_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@
array_marker_in_path_regexp = re.compile(r"\[]|\[[*]](?=[.])")


def to_query(db: Any, query_model: QueryModel, with_edges: bool = False) -> Tuple[str, Json]:
def to_query(
db: Any, query_model: QueryModel, with_edges: bool = False, from_collection: Optional[str] = None
) -> Tuple[str, Json]:
count: Dict[str, int] = defaultdict(lambda: 0)
query = query_model.query
bind_vars: Json = {}
cursor, query_str = query_string(db, query, query_model, db.vertex_name, with_edges, bind_vars, count)
start = from_collection or db.vertex_name
cursor, query_str = query_string(db, query, query_model, start, with_edges, bind_vars, count)
return f"""{query_str} FOR result in {cursor} RETURN UNSET(result, {unset_props})""", bind_vars


Expand Down
2 changes: 1 addition & 1 deletion resotocore/resotocore/db/db_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def get_graph_db(self, name: str, no_check: bool = False) -> GraphDB:
else:
if not no_check and not self.database.has_graph(name):
raise NoSuchGraph(name)
graph_db = ArangoGraphDB(self.db, name, self.adjust_node)
graph_db = ArangoGraphDB(self.db, name, self.adjust_node, self.config.graph_update)
event_db = EventGraphDB(graph_db, self.event_sender)
self.graph_dbs[name] = event_db
return event_db
Expand Down
Loading

0 comments on commit c9fb971

Please sign in to comment.