Skip to content

Commit

Permalink
Add --follow to snow app events (#1385)
Browse files Browse the repository at this point in the history
Adds `--follow` flag to `snow app events` to enable live-tail of events. The command will poll the `get_events` function every 10 seconds (by default, can be changed with `--follow-interval`), printing new rows on each iteration.

The way we detect new events is to make a second query filtering on rows where the `timestamp >=` the timestamp of the last row from the old results. The bound needs to be inclusive since there could be new events with the same timestamp in the new results (i.e. the `timestamp` column isn't necessarily monotonic). Since the bound is inclusive, the first few events from the new results would be the same events as the last few events of the old results, which means we need to only print the net-new rows from the new results. We do this by finding the point of largest overlap between the end of the old results and the start of the new results, returning the rows that come after.
  • Loading branch information
sfc-gh-fcampbell authored Jul 31, 2024
1 parent 6d1c686 commit 5f412eb
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 73 deletions.
84 changes: 65 additions & 19 deletions src/snowflake/cli/plugins/nativeapp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

from __future__ import annotations

import itertools
import logging
from enum import Enum
from pathlib import Path
from textwrap import dedent
from typing import List, Optional
from typing import Generator, Iterable, List, Optional, cast

import typer
from click import ClickException
Expand Down Expand Up @@ -420,6 +421,11 @@ class RecordType(Enum):
SPAN_EVENT = "span_event"


# The default number of lines to print before streaming when running
# snow app events --follow
DEFAULT_EVENT_FOLLOW_LAST = 20


@app.command("events", hidden=True, requires_connection=True)
@with_project_definition()
@nativeapp_definition_v2_to_v1
Expand All @@ -445,39 +451,79 @@ def app_events(
help="Restrict results to a specific scope name. Can be specified multiple times.",
),
first: int = typer.Option(
default=0, help="Fetch only the first N events. Cannot be used with --last."
default=-1,
show_default=False,
help="Fetch only the first N events. Cannot be used with --last.",
),
last: int = typer.Option(
default=0, help="Fetch only the last N events. Cannot be used with --first."
default=-1,
show_default=False,
help="Fetch only the last N events. Cannot be used with --first.",
),
follow: bool = typer.Option(
False,
"--follow",
"-f",
help=(
f"Continue polling for events. Implies --last {DEFAULT_EVENT_FOLLOW_LAST} "
f"unless overridden or the --since flag is used."
),
),
follow_interval: int = typer.Option(
10,
help=f"Polling interval in seconds when using the --follow flag.",
),
**options,
):
"""Fetches events for this app from the event table configured in Snowflake."""
if first and last:
if first >= 0 and last >= 0:
raise ClickException("--first and --last cannot be used together.")

if follow:
if until:
raise ClickException("--follow and --until cannot be used together.")
if first >= 0:
raise ClickException("--follow and --first cannot be used together.")

assert_project_type("native_app")

record_type_names = [r.name for r in record_types]
manager = NativeAppManager(
project_definition=get_cli_context().project_definition.native_app,
project_root=get_cli_context().project_root,
)
events = manager.get_events(
since_interval=since,
until_interval=until,
record_types=[r.name for r in record_types],
scopes=scopes,
first=first,
last=last,
)
if not events:
return MessageResult("No events found.")

def g():
for event in events:
yield EventResult(event)
if follow:
if last == -1 and not since:
# If we don't have a value for --last or --since, assume a value
# for --last so we at least print something before starting the stream
last = DEFAULT_EVENT_FOLLOW_LAST
stream: Iterable[CommandResult] = (
EventResult(event)
for event in manager.stream_events(
since=since,
last=last,
interval_seconds=follow_interval,
record_types=record_type_names,
scopes=scopes,
)
)
# Append a newline at the end to make the CLI output clean when we hit Ctrl-C
stream = itertools.chain(stream, [MessageResult("")])
else:
stream = (
EventResult(event)
for event in manager.get_events(
since=since,
until=until,
record_types=record_type_names,
scopes=scopes,
first=first,
last=last,
)
)

return StreamResult(g())
# Cast the stream to a Generator since that's what StreamResult wants
return StreamResult(cast(Generator[CommandResult, None, None], stream))


class EventResult(ObjectResult, MessageResult):
Expand Down
90 changes: 72 additions & 18 deletions src/snowflake/cli/plugins/nativeapp/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

import json
import os
import time
from abc import ABC, abstractmethod
from contextlib import contextmanager
from datetime import datetime
from functools import cached_property
from pathlib import Path
from textwrap import dedent
from typing import Any, List, NoReturn, Optional, TypedDict
from typing import Any, Generator, List, NoReturn, Optional, TypedDict

import jinja2
from click import ClickException
Expand Down Expand Up @@ -714,34 +716,36 @@ def get_validation_result(self, use_scratch_stage: bool):

def get_events(
self,
since_interval: str = "",
until_interval: str = "",
since: str | datetime | None = None,
until: str | datetime | None = None,
record_types: list[str] | None = None,
scopes: list[str] | None = None,
first: int = 0,
last: int = 0,
first: int = -1,
last: int = -1,
) -> list[dict]:
record_types = record_types or []
scopes = scopes or []

if first and last:
if first >= 0 and last >= 0:
raise ValueError("first and last cannot be used together")

if not self.account_event_table:
raise NoEventTableForAccount()

# resource_attributes:"snow.database.name" uses the unquoted/uppercase app name
app_name = unquote_identifier(self.app_name)
since_clause = (
f"and timestamp >= sysdate() - interval '{since_interval}'"
if since_interval
else ""
)
until_clause = (
f"and timestamp <= sysdate() - interval '{until_interval}'"
if until_interval
else ""
)
if isinstance(since, datetime):
since_clause = f"and timestamp >= '{since}'"
elif isinstance(since, str) and since:
since_clause = f"and timestamp >= sysdate() - interval '{since}'"
else:
since_clause = ""
if isinstance(until, datetime):
until_clause = f"and timestamp <= '{until}'"
elif isinstance(until, str) and until:
until_clause = f"and timestamp <= sysdate() - interval '{until}'"
else:
until_clause = ""
type_in_values = ",".join(f"'{v}'" for v in record_types)
types_clause = (
f"and record_type in ({type_in_values})" if type_in_values else ""
Expand All @@ -750,8 +754,8 @@ def get_events(
scopes_clause = (
f"and scope:name in ({scope_in_values})" if scope_in_values else ""
)
first_clause = f"limit {first}" if first else ""
last_clause = f"limit {last}" if last else ""
first_clause = f"limit {first}" if first >= 0 else ""
last_clause = f"limit {last}" if last >= 0 else ""
query = dedent(
f"""\
select * from (
Expand All @@ -773,6 +777,56 @@ def get_events(
except ProgrammingError as err:
generic_sql_error_handler(err)

def stream_events(
self,
interval_seconds: int,
since: str | datetime | None = None,
record_types: list[str] | None = None,
scopes: list[str] | None = None,
last: int = -1,
) -> Generator[dict, None, None]:
try:
events = self.get_events(
since=since, record_types=record_types, scopes=scopes, last=last
)
yield from events # Yield the initial batch of events
last_event_time = events[-1]["TIMESTAMP"]

while True: # Then infinite poll for new events
time.sleep(interval_seconds)
previous_events = events
events = self.get_events(
since=last_event_time, record_types=record_types, scopes=scopes
)
if not events:
continue

yield from _new_events_only(previous_events, events)
last_event_time = events[-1]["TIMESTAMP"]
except KeyboardInterrupt:
return


def _new_events_only(previous_events: list[dict], new_events: list[dict]) -> list[dict]:
# The timestamp that overlaps between both sets of events
overlap_time = new_events[0]["TIMESTAMP"]

# Remove all the events from the new result set
# if they were already printed. We iterate and remove
# instead of filtering in order to handle duplicates
# (i.e. if an event is present 3 times in new_events
# but only once in previous_events, it should still
# appear twice in new_events at the end
new_events = new_events.copy()
for event in reversed(previous_events):
if event["TIMESTAMP"] < overlap_time:
break
# No need to handle ValueError here since we know
# that events that pass the above if check will
# either be in both lists or in new_events only
new_events.remove(event)
return new_events


def _validation_item_to_str(item: dict[str, str | int]):
s = item["message"]
Expand Down
65 changes: 40 additions & 25 deletions tests/__snapshots__/test_help_messages.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -253,31 +253,46 @@
Fetches events for this app from the event table configured in Snowflake.

+- Options --------------------------------------------------------------------+
| --since TEXT Fetch events that are newer than |
| this time ago, in Snowflake |
| interval syntax. |
| --until TEXT Fetch events that are older than |
| this time ago, in Snowflake |
| interval syntax. |
| --type [log|span|span_event] Restrict results to specific |
| record type. Can be specified |
| multiple times. |
| --scope TEXT Restrict results to a specific |
| scope name. Can be specified |
| multiple times. |
| --first INTEGER Fetch only the first N events. |
| Cannot be used with --last. |
| [default: 0] |
| --last INTEGER Fetch only the last N events. |
| Cannot be used with --first. |
| [default: 0] |
| --project -p TEXT Path where Snowflake project |
| resides. Defaults to current |
| working directory. |
| --env TEXT String in format of key=value. |
| Overrides variables from env |
| section used for templating. |
| --help -h Show this message and exit. |
| --since TEXT Fetch events that are |
| newer than this time ago, |
| in Snowflake interval |
| syntax. |
| --until TEXT Fetch events that are |
| older than this time ago, |
| in Snowflake interval |
| syntax. |
| --type [log|span|span_event] Restrict results to |
| specific record type. Can |
| be specified multiple |
| times. |
| --scope TEXT Restrict results to a |
| specific scope name. Can |
| be specified multiple |
| times. |
| --first INTEGER Fetch only the first N |
| events. Cannot be used |
| with --last. |
| --last INTEGER Fetch only the last N |
| events. Cannot be used |
| with --first. |
| --follow -f Continue polling for |
| events. Implies --last 20 |
| unless overridden or the |
| --since flag is used. |
| --follow-interval INTEGER Polling interval in |
| seconds when using the |
| --follow flag. |
| [default: 10] |
| --project -p TEXT Path where Snowflake |
| project resides. Defaults |
| to current working |
| directory. |
| --env TEXT String in format of |
| key=value. Overrides |
| variables from env section |
| used for templating. |
| --help -h Show this message and |
| exit. |
+------------------------------------------------------------------------------+
+- Connection configuration ---------------------------------------------------+
| --connection,--environment -c TEXT Name of the connection, as defined |
Expand Down
Loading

0 comments on commit 5f412eb

Please sign in to comment.