diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 38a173450e60..15d2b7f49587 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -4368,7 +4368,7 @@ ## logs
-56% implemented +60% implemented - [ ] associate_kms_key - [ ] cancel_export_task @@ -4391,7 +4391,7 @@ - [X] describe_log_groups - [X] describe_log_streams - [X] describe_metric_filters -- [ ] describe_queries +- [X] describe_queries - [ ] describe_query_definitions - [X] describe_resource_policies - [X] describe_subscription_filters @@ -4401,7 +4401,7 @@ - [X] get_log_events - [ ] get_log_group_fields - [ ] get_log_record -- [ ] get_query_results +- [X] get_query_results - [ ] list_tags_for_resource - [X] list_tags_log_group - [ ] put_account_policy diff --git a/docs/docs/services/logs.rst b/docs/docs/services/logs.rst index 7942108d22c5..af2c105eca90 100644 --- a/docs/docs/services/logs.rst +++ b/docs/docs/services/logs.rst @@ -48,7 +48,11 @@ logs - [X] describe_log_groups - [X] describe_log_streams - [X] describe_metric_filters -- [ ] describe_queries +- [X] describe_queries + + Pagination is not yet implemented + + - [ ] describe_query_definitions - [X] describe_resource_policies Return list of resource policies. @@ -70,7 +74,11 @@ logs - [X] get_log_events - [ ] get_log_group_fields - [ ] get_log_record -- [ ] get_query_results +- [X] get_query_results + + Not all query commands are implemented yet. Please raise an issue if you encounter unexpected results. + + - [ ] list_tags_for_resource - [X] list_tags_log_group - [ ] put_account_policy diff --git a/moto/logs/logs_query/__init__.py b/moto/logs/logs_query/__init__.py new file mode 100644 index 000000000000..e47bf1a03718 --- /dev/null +++ b/moto/logs/logs_query/__init__.py @@ -0,0 +1,90 @@ +from typing import Any, Dict, List +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ..models import LogGroup, LogEvent, LogStream + +from .query_parser import parse_query, ParsedQuery + + +class ParsedEvent: + def __init__( + self, + event: "LogEvent", + query: ParsedQuery, + log_stream: "LogStream", + log_group: "LogGroup", + ): + self.event = event + self.query = query + self.log_stream = log_stream + self.log_group = log_group + self.fields = self._create_fields() + + def _create_fields(self) -> Dict[str, Any]: + fields: Dict[str, Any] = {"@ptr": self.event.event_id} + if "@timestamp" in self.query.fields: + fields["@timestamp"] = self.event.timestamp + if "@message" in self.query.fields: + fields["@message"] = self.event.message + if "@logStream" in self.query.fields: + fields["@logStream"] = self.log_stream.log_stream_name # type: ignore[has-type] + if "@log" in self.query.fields: + fields["@log"] = self.log_group.name + return fields + + def __eq__(self, other: "ParsedEvent") -> bool: # type: ignore[override] + return self.event.timestamp == other.event.timestamp + + def __lt__(self, other: "ParsedEvent") -> bool: + return self.event.timestamp < other.event.timestamp + + def __le__(self, other: "ParsedEvent") -> bool: + return self.event.timestamp <= other.event.timestamp + + def __gt__(self, other: "ParsedEvent") -> bool: + return self.event.timestamp > other.event.timestamp + + def __ge__(self, other: "ParsedEvent") -> bool: + return self.event.timestamp >= other.event.timestamp + + +def execute_query( + log_groups: List["LogGroup"], query: str, start_time: int, end_time: int +) -> List[Dict[str, str]]: + parsed = parse_query(query) + all_events = _create_parsed_events(log_groups, parsed, start_time, end_time) + sorted_events = sorted(all_events, reverse=parsed.sort_reversed()) + sorted_fields = [event.fields for event in sorted_events] + if parsed.limit: + return sorted_fields[0 : parsed.limit] + return sorted_fields + + +def _create_parsed_events( + log_groups: List["LogGroup"], query: ParsedQuery, start_time: int, end_time: int +) -> List["ParsedEvent"]: + def filter_func(event: "LogEvent") -> bool: + # Start/End time is in epoch seconds + # Event timestamp is in epoch milliseconds + if start_time and event.timestamp < (start_time * 1000): + return False + + if end_time and event.timestamp > (end_time * 1000): + return False + + return True + + events: List["ParsedEvent"] = [] + for group in log_groups: + for stream in group.streams.values(): + events.extend( + [ + ParsedEvent( + event=event, query=query, log_stream=stream, log_group=group + ) + for event in filter(filter_func, stream.events) + ] + ) + + return events diff --git a/moto/logs/logs_query/query_parser.py b/moto/logs/logs_query/query_parser.py new file mode 100644 index 000000000000..2cdf4064c7cb --- /dev/null +++ b/moto/logs/logs_query/query_parser.py @@ -0,0 +1,74 @@ +from typing import List, Optional, Tuple + +from moto.utilities.tokenizer import GenericTokenizer + + +class ParsedQuery: + def __init__(self) -> None: + self.limit: Optional[int] = None + self.fields: List[str] = [] + self.sort: List[Tuple[str, str]] = [] + + def sort_reversed(self) -> bool: + # Descending is the default + if self.sort: + # sort_reversed is True if we want to sort in ascending order + return self.sort[-1][-1] == "asc" + return False + + +def parse_query(query: str) -> ParsedQuery: + tokenizer = GenericTokenizer(query) + state = "COMMAND" + characters = "" + parsed_query = ParsedQuery() + + for char in tokenizer: + if char.isspace(): + if state == "SORT": + parsed_query.sort.append((characters, "desc")) + characters = "" + state = "SORT_ORDER" + if state == "COMMAND": + if characters.lower() in ["fields", "limit", "sort"]: + state = characters.upper() + else: + # Unknown/Unsupported command + pass + characters = "" + tokenizer.skip_white_space() + continue + + if char == "|": + if state == "FIELDS": + parsed_query.fields.append(characters) + characters = "" + if state == "LIMIT": + parsed_query.limit = int(characters) + characters = "" + if state == "SORT_ORDER": + if characters != "": + parsed_query.sort[-1] = (parsed_query.sort[-1][0], characters) + characters = "" + state = "COMMAND" + tokenizer.skip_white_space() + continue + + if char == ",": + if state == "FIELDS": + parsed_query.fields.append(characters) + characters = "" + continue + + characters += char + + if state == "FIELDS": + parsed_query.fields.append(characters) + if state == "LIMIT": + parsed_query.limit = int(characters) + if state == "SORT": + parsed_query.sort.append((characters, "desc")) + if state == "SORT_ORDER": + parsed_query.sort[-1] = (parsed_query.sort[-1][0], characters) + + return parsed_query diff --git a/moto/logs/models.py b/moto/logs/models.py index 3fdefd520110..6cd2700d5110 100644 --- a/moto/logs/models.py +++ b/moto/logs/models.py @@ -10,6 +10,7 @@ InvalidParameterException, LimitExceededException, ) +from moto.logs.logs_query import execute_query from moto.moto_api._internal import mock_random from moto.s3.models import s3_backends from moto.utilities.paginator import paginate @@ -47,11 +48,43 @@ def to_dict(self) -> Dict[str, Any]: class LogQuery(BaseModel): - def __init__(self, query_id: str, start_time: str, end_time: str, query: str): + def __init__( + self, + query_id: str, + start_time: int, + end_time: int, + query: str, + log_groups: List["LogGroup"], + ): self.query_id = query_id self.start_time = start_time self.end_time = end_time self.query = query + self.log_group_names = [lg.name for lg in log_groups] + self.create_time = unix_time_millis() + self.status = "Running" + self.results = execute_query( + log_groups=log_groups, query=query, start_time=start_time, end_time=end_time + ) + self.status = "Complete" + + def to_json(self, log_group_name: str) -> Dict[str, Any]: + return { + "queryId": self.query_id, + "queryString": self.query, + "status": self.status, + "createTime": self.create_time, + "logGroupName": log_group_name, + } + + def to_result_json(self) -> Dict[str, Any]: + return { + "results": [ + [{"field": key, "value": val} for key, val in result.items()] + for result in self.results + ], + "status": self.status, + } class LogEvent(BaseModel): @@ -1136,19 +1169,42 @@ def delete_subscription_filter(self, log_group_name: str, filter_name: str) -> N def start_query( self, log_group_names: List[str], - start_time: str, - end_time: str, + start_time: int, + end_time: int, query_string: str, ) -> str: for log_group_name in log_group_names: if log_group_name not in self.groups: raise ResourceNotFoundException() + log_groups = [self.groups[name] for name in log_group_names] query_id = str(mock_random.uuid1()) - self.queries[query_id] = LogQuery(query_id, start_time, end_time, query_string) + self.queries[query_id] = LogQuery( + query_id, start_time, end_time, query_string, log_groups + ) return query_id + def describe_queries( + self, log_stream_name: str, status: Optional[str] + ) -> List[LogQuery]: + """ + Pagination is not yet implemented + """ + queries: List[LogQuery] = [] + for query in self.queries.values(): + if log_stream_name in query.log_group_names and ( + not status or status == query.status + ): + queries.append(query) + return queries + + def get_query_results(self, query_id: str) -> LogQuery: + """ + Not all query commands are implemented yet. Please raise an issue if you encounter unexpected results. + """ + return self.queries[query_id] + def create_export_task( self, log_group_name: str, destination: Dict[str, Any] ) -> str: diff --git a/moto/logs/responses.py b/moto/logs/responses.py index ba0e3be1ffc1..a798b403973c 100644 --- a/moto/logs/responses.py +++ b/moto/logs/responses.py @@ -399,8 +399,8 @@ def delete_subscription_filter(self) -> str: def start_query(self) -> str: log_group_name = self._get_param("logGroupName") log_group_names = self._get_param("logGroupNames") - start_time = self._get_param("startTime") - end_time = self._get_param("endTime") + start_time = self._get_int_param("startTime") + end_time = self._get_int_param("endTime") query_string = self._get_param("queryString") if log_group_name and log_group_names: @@ -415,6 +415,19 @@ def start_query(self) -> str: return json.dumps({"queryId": f"{query_id}"}) + def describe_queries(self) -> str: + log_group_name = self._get_param("logGroupName") + status = self._get_param("status") + queries = self.logs_backend.describe_queries(log_group_name, status) + return json.dumps( + {"queries": [query.to_json(log_group_name) for query in queries]} + ) + + def get_query_results(self) -> str: + query_id = self._get_param("queryId") + query = self.logs_backend.get_query_results(query_id) + return json.dumps(query.to_result_json()) + def create_export_task(self) -> str: log_group_name = self._get_param("logGroupName") destination = self._get_param("destination") diff --git a/tests/test_logs/test_logs.py b/tests/test_logs/test_logs.py index bc47a2a1c9fe..2dfe9b3b62f7 100644 --- a/tests/test_logs/test_logs.py +++ b/tests/test_logs/test_logs.py @@ -1,5 +1,4 @@ import json -import time from datetime import timedelta, datetime from uuid import UUID @@ -1234,39 +1233,6 @@ def test_describe_log_streams_paging(): assert "nextToken" not in resp -@mock_logs -def test_start_query(): - client = boto3.client("logs", "us-east-1") - - log_group_name = "/aws/codebuild/lowercase-dev" - client.create_log_group(logGroupName=log_group_name) - - response = client.start_query( - logGroupName=log_group_name, - startTime=int(time.time()), - endTime=int(time.time()) + 300, - queryString="test", - ) - - assert "queryId" in response - - with pytest.raises(ClientError) as exc: - client.start_query( - logGroupName="/aws/codebuild/lowercase-dev-invalid", - startTime=int(time.time()), - endTime=int(time.time()) + 300, - queryString="test", - ) - - # then - exc_value = exc.value - assert "ResourceNotFoundException" in exc_value.response["Error"]["Code"] - assert ( - exc_value.response["Error"]["Message"] - == "The specified log group does not exist" - ) - - @pytest.mark.parametrize("nr_of_events", [10001, 1000000]) @mock_logs def test_get_too_many_log_events(nr_of_events): diff --git a/tests/test_logs/test_logs_query/test_boto3.py b/tests/test_logs/test_logs_query/test_boto3.py new file mode 100644 index 000000000000..5dc54341d1d9 --- /dev/null +++ b/tests/test_logs/test_logs_query/test_boto3.py @@ -0,0 +1,150 @@ +import time +from datetime import timedelta, datetime + +import boto3 +import pytest +from botocore.exceptions import ClientError + +from moto import mock_logs +from moto.core.utils import unix_time, unix_time_millis + + +@mock_logs +def test_start_query__unknown_log_group(): + client = boto3.client("logs", "us-east-1") + + log_group_name = "/aws/codebuild/lowercase-dev" + client.create_log_group(logGroupName=log_group_name) + + response = client.start_query( + logGroupName=log_group_name, + startTime=int(time.time()), + endTime=int(time.time()) + 300, + queryString="test", + ) + + assert "queryId" in response + + with pytest.raises(ClientError) as exc: + client.start_query( + logGroupName="/aws/codebuild/lowercase-dev-invalid", + startTime=int(time.time()), + endTime=int(time.time()) + 300, + queryString="test", + ) + + # then + exc_value = exc.value + assert "ResourceNotFoundException" in exc_value.response["Error"]["Code"] + assert ( + exc_value.response["Error"]["Message"] + == "The specified log group does not exist" + ) + + +@mock_logs +def test_get_query_results(): + client = boto3.client("logs", "us-east-1") + log_group_name = "test" + log_stream_name = "stream" + client.create_log_group(logGroupName=log_group_name) + client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) + + data = [ + ( + int(unix_time_millis(datetime.utcnow() - timedelta(minutes=x))), + f"event nr {x}", + ) + for x in range(5) + ] + events = [{"timestamp": x, "message": y} for x, y in reversed(data)] + + client.put_log_events( + logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=events + ) + + query_id = client.start_query( + logGroupName="test", + startTime=int(unix_time(datetime.utcnow() - timedelta(minutes=10))), + endTime=int(unix_time(datetime.utcnow() + timedelta(minutes=10))), + queryString="fields @message", + )["queryId"] + + resp = client.get_query_results(queryId=query_id) + assert resp["status"] == "Complete" + assert len(resp["results"]) == 5 + + fields = set([row["field"] for field in resp["results"] for row in field]) + assert fields == {"@ptr", "@message"} + + messages = [ + row["value"] + for field in resp["results"] + for row in field + if row["field"] == "@message" + ] + assert messages == [ + "event nr 4", + "event nr 3", + "event nr 2", + "event nr 1", + "event nr 0", + ] + + # Only find events from last 2 minutes + query_id = client.start_query( + logGroupName="test", + startTime=int(unix_time(datetime.utcnow() - timedelta(minutes=2))), + endTime=int(unix_time(datetime.utcnow())), + queryString="fields @message", + )["queryId"] + + resp = client.get_query_results(queryId=query_id) + assert len(resp["results"]) == 2 + + messages = [ + row["value"] + for field in resp["results"] + for row in field + if row["field"] == "@message" + ] + assert messages == ["event nr 2", "event nr 1"] + + +@mock_logs +def test_describe_completed_query(): + client = boto3.client("logs", "us-east-1") + + client.create_log_group(logGroupName="test") + + query_id = client.start_query( + logGroupName="test", + startTime=int(unix_time(datetime.utcnow() + timedelta(minutes=10))), + endTime=int(unix_time(datetime.utcnow() + timedelta(minutes=10))), + queryString="fields @message", + )["queryId"] + + queries = client.describe_queries(logGroupName="test")["queries"] + + assert len(queries) == 1 + assert queries[0]["queryId"] == query_id + assert queries[0]["queryString"] == "fields @message" + assert queries[0]["status"] == "Complete" + assert queries[0]["createTime"] + assert queries[0]["logGroupName"] == "test" + + queries = client.describe_queries(logGroupName="test", status="Complete")["queries"] + assert len(queries) == 1 + + queries = client.describe_queries(logGroupName="test", status="Scheduled")[ + "queries" + ] + assert len(queries) == 0 + + +@mock_logs +def test_describe_queries_on_log_group_without_any(): + client = boto3.client("logs", "us-east-1") + + client.create_log_group(logGroupName="test1") + assert client.describe_queries(logGroupName="test1")["queries"] == [] diff --git a/tests/test_logs/test_logs_query/test_query.py b/tests/test_logs/test_logs_query/test_query.py new file mode 100644 index 000000000000..a37574831eea --- /dev/null +++ b/tests/test_logs/test_logs_query/test_query.py @@ -0,0 +1,63 @@ +from moto.core import DEFAULT_ACCOUNT_ID +from moto.logs.models import LogGroup +from moto.logs.logs_query import execute_query +from moto.core.utils import unix_time, unix_time_millis + +from unittest import TestCase +from uuid import uuid4 + + +DEFAULT_QUERY = """fields @timestamp, @message, @logStream, @log +| sort @timestamp desc +| limit 20""" + +SIMPLIFIED_ONE_LINE_QUERY = "fields @timestamp, @message | sort @timestamp asc" + + +class TestLogsQueries(TestCase): + def setUp(self) -> None: + self.log_group = LogGroup( + account_id=DEFAULT_ACCOUNT_ID, region="us-east-1", name="test", tags={} + ) + self.stream_1_name = f"2022/02/02/[$LATEST]{uuid4()}" + self.log_group.create_log_stream(self.stream_1_name) + event1 = { + "timestamp": unix_time_millis() - 1000, + "message": "my previous message", + } + event2 = {"timestamp": unix_time_millis(), "message": "my current message"} + self.events = [event1, event2] + self.log_group.streams[self.stream_1_name].put_log_events(self.events) + + def test_default_query(self): + resp = execute_query( + [self.log_group], + DEFAULT_QUERY, + start_time=unix_time() - 2000, + end_time=unix_time() + 2000, + ) + for event in resp: + event.pop("@ptr") + assert resp == [ + { + "@timestamp": event["timestamp"], + "@message": event["message"], + "@logStream": self.stream_1_name, + "@log": "test", + } + for event in self.events + ] + + def test_simplified_query(self): + resp = execute_query( + [self.log_group], + SIMPLIFIED_ONE_LINE_QUERY, + start_time=unix_time() - 2000, + end_time=unix_time() + 2000, + ) + for event in resp: + event.pop("@ptr") + assert resp == [ + {"@timestamp": event["timestamp"], "@message": event["message"]} + for event in reversed(self.events) + ] diff --git a/tests/test_logs/test_logs_query/test_query_parser.py b/tests/test_logs/test_logs_query/test_query_parser.py new file mode 100644 index 000000000000..d02bbc9a6dbd --- /dev/null +++ b/tests/test_logs/test_logs_query/test_query_parser.py @@ -0,0 +1,26 @@ +import pytest + +from moto.logs.logs_query.query_parser import parse_query + + +@pytest.mark.parametrize( + "query,fields,limit,sort", + [ + ("fields @timestamp", ["@timestamp"], None, []), + ("fields @timestamp, @message", ["@timestamp", "@message"], None, []), + ("limit 42", [], 42, []), + ("sort @timestamp desc", [], None, [("@timestamp", "desc")]), + ("sort @timestamp asc", [], None, [("@timestamp", "asc")]), + ("sort @timestamp", [], None, [("@timestamp", "desc")]), + ("fields @timestamp | limit 42", ["@timestamp"], 42, []), + ("limit 42 | fields @timestamp", ["@timestamp"], 42, []), + ("fields @fld | sort @fld | limit 42", ["@fld"], 42, [("@fld", "desc")]), + ("sort @fld asc | fields @fld | limit 42", ["@fld"], 42, [("@fld", "asc")]), + ("limit 42 | sort @fld | fields @fld", ["@fld"], 42, [("@fld", "desc")]), + ], +) +def test_query(query, fields, limit, sort): + parsed = parse_query(query) + assert parsed.fields == fields + assert parsed.limit == limit + assert parsed.sort == sort