Skip to content

Commit

Permalink
Address more typing errors (#263)
Browse files Browse the repository at this point in the history
Related: #258
  • Loading branch information
ssbarnea authored Aug 15, 2024
1 parent b036117 commit 67be178
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 16 deletions.
4 changes: 2 additions & 2 deletions extensions/eda/plugins/event_source/aws_cloudtrail.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def _cloudtrail_event_to_dict(event: dict) -> dict:
return event


def _get_events(events: list[dict], last_event_ids: list) -> list:
def _get_events(events: list[dict], last_event_ids: list[str]) -> list:
event_time = None
event_ids = []
result = []
Expand Down Expand Up @@ -89,7 +89,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:

async with session.create_client("cloudtrail", **connection_args(args)) as client:
event_time = None
event_ids = []
event_ids: list[str] = []
while True:
if event_time is not None:
params["StartTime"] = event_time
Expand Down
13 changes: 7 additions & 6 deletions extensions/eda/plugins/event_source/aws_sqs_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from aiobotocore.session import get_session


# pylint: disable=too-many-locals
async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:
"""Receive events via an AWS SQS queue."""
logger = logging.getLogger()
Expand Down Expand Up @@ -64,27 +65,27 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:
)

if "Messages" in response_msg:
for msg in response_msg["Messages"]:
for entry in response_msg["Messages"]:
if (
not isinstance(msg, dict) or "MessageId" not in msg
not isinstance(entry, dict) or "MessageId" not in entry
): # pragma: no cover
err_msg = (
f"Unexpected response {response_msg}, missing MessageId."
)
raise ValueError(err_msg)
meta = {"MessageId": msg["MessageId"]}
meta = {"MessageId": entry["MessageId"]}
try:
msg_body = json.loads(msg["Body"])
msg_body = json.loads(entry["Body"])
except json.JSONDecodeError:
msg_body = msg["Body"]
msg_body = entry["Body"]

await queue.put({"body": msg_body, "meta": meta})
await asyncio.sleep(0)

# Need to remove msg from queue or else it'll reappear
await client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=msg["ReceiptHandle"],
ReceiptHandle=entry["ReceiptHandle"],
)
else:
logger.debug("No messages in queue")
Expand Down
2 changes: 1 addition & 1 deletion extensions/eda/plugins/event_source/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def _create_data(
self: Generic,
index: int,
) -> dict:
data = {}
data: dict[str, str | int] = {}
if self.my_args.create_index:
data[self.my_args.create_index] = index
if self.blob:
Expand Down
6 changes: 4 additions & 2 deletions extensions/eda/plugins/event_source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ async def receive_msg(
logger = logging.getLogger()

async for msg in kafka_consumer:
event = {}
event: dict[str, Any] = {}

# Process headers
try:
headers = {header[0]: header[1].decode(encoding) for header in msg.headers}
headers: dict[str, str] = {
header[0]: header[1].decode(encoding) for header in msg.headers
}
event["meta"] = {}
event["meta"]["headers"] = headers
except UnicodeError:
Expand Down
4 changes: 2 additions & 2 deletions extensions/eda/plugins/event_source/pg_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:
conninfo=args["dsn"],
autocommit=True,
) as conn:
chunked_cache = {}
chunked_cache: dict[str, Any] = {}
cursor = conn.cursor()
for channel in args["channels"]:
await cursor.execute(f"LISTEN {channel};")
Expand All @@ -118,7 +118,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:


async def _handle_chunked_message(
data: dict,
data: dict[str, Any],
chunked_cache: dict,
queue: asyncio.Queue,
) -> None:
Expand Down
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ error_summary = true

# TODO: Remove temporary skips and close https://github.com/ansible/event-driven-ansible/issues/258
disable_error_code = [
"assignment",
"attr-defined",
"override",
"var-annotated",
]
# strict = true
# disallow_untyped_calls = true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import http.server
import os
import threading
from typing import Any, Generator

import pytest

Expand All @@ -20,7 +21,7 @@ def log_message(self, format, *args):


@pytest.fixture(scope="function")
def init_webserver():
def init_webserver() -> Generator[Any, Any, Any]:
handler = HttpHandler
port: int = 8000
httpd = http.server.HTTPServer(("", port), handler)
Expand Down

0 comments on commit 67be178

Please sign in to comment.