Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup CI #17

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
root = true

[*]
indent_style = space
indent_size = 4
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true

[Makefile]
indent_style = tab

[*.{yml,yaml,json,js,ts}]
indent_size = 2
36 changes: 36 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
on:
push:
branches:
- main
tags:
- "*"

pull_request:
branches:
- "*"
workflow_dispatch:

name: CI

jobs:
ci:
name: Test
runs-on: "ubuntu-latest"
strategy:
matrix:
version: ["3.7", "3.8", "3.9", "3.10"]
fail-fast: false

services:
elasticmq-native:
image: softwaremill/elasticmq-native
ports:
- 9324:9324
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.version }}
- run: make install-dependencies
- run: make lint
- run: make test
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ __pycache__
build
dist
htmlcov
coverage.xml
29 changes: 29 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
install-dependencies:
pip install -U pip
pip install .[dev]

test:
pytest


###
# Lint section
###
_flake8:
@flake8 --show-source .

_isort:
@isort --check-only .

_black:
@black --diff --check .

_isort_fix:
@isort .

_black_fix:
@black .


lint: _flake8 _isort _black
format-code: _isort_fix _black_fix
74 changes: 39 additions & 35 deletions dramatiq_sqs/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ class SQSBroker(dramatiq.Broker):
"""

def __init__(
self, *,
namespace: Optional[str] = None,
middleware: Optional[List[dramatiq.Middleware]] = None,
retention: int = MAX_MESSAGE_RETENTION,
dead_letter: bool = False,
max_receives: int = MAX_RECEIVES,
tags: Optional[Dict[str, str]] = None,
**options,
self,
*,
namespace: Optional[str] = None,
middleware: Optional[List[dramatiq.Middleware]] = None,
retention: int = MAX_MESSAGE_RETENTION,
dead_letter: bool = False,
max_receives: int = MAX_RECEIVES,
tags: Optional[Dict[str, str]] = None,
**options,
) -> None:
super().__init__(middleware=middleware)

Expand Down Expand Up @@ -104,31 +105,21 @@ def declare_queue(self, queue_name: str) -> None:
QueueName=prefixed_queue_name,
Attributes={
"MessageRetentionPeriod": self.retention,
}
},
)
if self.tags:
self.sqs.meta.client.tag_queue(
QueueUrl=self.queues[queue_name].url,
Tags=self.tags
)
self.sqs.meta.client.tag_queue(QueueUrl=self.queues[queue_name].url, Tags=self.tags)

if self.dead_letter:
dead_letter_queue_name = f"{prefixed_queue_name}_dlq"
dead_letter_queue = self.sqs.create_queue(
QueueName=dead_letter_queue_name
)
dead_letter_queue = self.sqs.create_queue(QueueName=dead_letter_queue_name)
if self.tags:
self.sqs.meta.client.tag_queue(
QueueUrl=dead_letter_queue.url,
Tags=self.tags
)
self.sqs.meta.client.tag_queue(QueueUrl=dead_letter_queue.url, Tags=self.tags)
redrive_policy = {
"deadLetterTargetArn": dead_letter_queue.attributes["QueueArn"],
"maxReceiveCount": str(self.max_receives)
"maxReceiveCount": str(self.max_receives),
}
self.queues[queue_name].set_attributes(Attributes={
"RedrivePolicy": json.dumps(redrive_policy)
})
self.queues[queue_name].set_attributes(Attributes={"RedrivePolicy": json.dumps(redrive_policy)})
self.emit_after("declare_queue", queue_name)

def enqueue(self, message: dramatiq.Message, *, delay: Optional[int] = None) -> dramatiq.Message:
Expand All @@ -146,7 +137,11 @@ def enqueue(self, message: dramatiq.Message, *, delay: Optional[int] = None) ->
if len(encoded_message) > MAX_MESSAGE_SIZE:
raise RuntimeError("Messages in SQS can be at most 256KiB large.")

self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name)
self.logger.debug(
"Enqueueing message %r on queue %r.",
message.message_id,
queue_name,
)
self.emit_before("enqueue", message, delay)
queue.send_message(
MessageBody=encoded_message,
Expand Down Expand Up @@ -181,20 +176,30 @@ def ack(self, message: "_SQSMessage") -> None:
def requeue(self, messages: Iterable["_SQSMessage"]) -> None:
for batch in chunk(messages, chunksize=10):
# Re-enqueue batches of up to 10 messages.
send_response = self.queue.send_messages(Entries=[{
"Id": str(i),
"MessageBody": message._sqs_message.body,
} for i, message in enumerate(batch)])
send_response = self.queue.send_messages(
Entries=[
{
"Id": str(i),
"MessageBody": message._sqs_message.body,
}
for i, message in enumerate(batch)
]
)

# Then delete the ones that were successfully re-enqueued.
# The rest will have to wait until their visibility
# timeout expires.
failed_message_ids = [int(res["Id"]) for res in send_response.get("Failed", [])]
requeued_messages = [m for i, m in enumerate(batch) if i not in failed_message_ids]
self.queue.delete_messages(Entries=[{
"Id": str(i),
"ReceiptHandle": message._sqs_message.receipt_handle,
} for i, message in enumerate(requeued_messages)])
self.queue.delete_messages(
Entries=[
{
"Id": str(i),
"ReceiptHandle": message._sqs_message.receipt_handle,
}
for i, message in enumerate(requeued_messages)
]
)

self.message_refc -= len(requeued_messages)

Expand Down Expand Up @@ -233,8 +238,7 @@ def __init__(self, sqs_message: Any, message: dramatiq.Message) -> None:


def chunk(xs: Iterable[T], *, chunksize=10) -> Iterable[Sequence[T]]:
"""Split a sequence into subseqs of chunksize length.
"""
"""Split a sequence into subseqs of chunksize length."""
chunk = []
for x in xs:
chunk.append(x)
Expand Down
33 changes: 33 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[tool.black]
line-length = 120
target-version = [ "py37" ]
include = "\\.pyi?$"
exclude = """
(
/(
\\.eggs # exclude a few common directories in the
| \\.git # root of the project
| \\.hg
| \\.mypy_cache
| \\.tox
| \\.venv
| _build
| buck-out
| build
| dist
)
)
"""

[tool.isort]
profile = "black"
known_first_party = "dramatiq_sqs"
multi_line_output = 3
use_parentheses = true
include_trailing_comma = true

[tool.coverage.run]
branch = true

[tool.coverage.report]
precision = 2
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool:pytest]
testpaths = tests
addopts = --cov dramatiq_sqs --cov-report html
addopts = --cov dramatiq_sqs --cov-report html --cov-report term-missing --cov-report xml

[pep8]
max-line-length = 120
Expand Down
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ def rel(*xs):
install_requires=["boto3", "dramatiq"],
extras_require={
"dev": [
"black",
"bumpversion",
"flake8",
"flake8-quotes",
"flake8",
"isort",
"mypy",
"pytest",
"pytest-cov",
"pytest",
"twine",
],
},
Expand Down
10 changes: 7 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

import dramatiq
import pytest
from dramatiq.middleware import (AgeLimit, Callbacks, Pipelines, Retries,
TimeLimit)
from dramatiq.middleware import AgeLimit, Callbacks, Pipelines, Retries, TimeLimit

from dramatiq_sqs import SQSBroker

Expand All @@ -18,13 +17,18 @@
@pytest.fixture
def broker():
broker = SQSBroker(
endpoint_url="http://127.0.0.1:9324",
region_name="elasticmq",
aws_secret_access_key="x",
aws_access_key_id="x",
use_ssl=False,
namespace="dramatiq_sqs_tests",
middleware=[
AgeLimit(),
TimeLimit(),
Callbacks(),
Pipelines(),
Retries(min_backoff=1000, max_backoff=900000, max_retries=96),
Retries(min_backoff=1, max_backoff=900000, max_retries=2),
],
tags={
"owner": "dramatiq_sqs_tests",
Expand Down
48 changes: 31 additions & 17 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,22 +135,33 @@ def test_creates_dead_letter_queue():
namespace="dramatiq_sqs_tests",
dead_letter=True,
max_receives=20,
endpoint_url="http://127.0.0.1:9324",
region_name="elasticmq",
aws_secret_access_key="x",
aws_access_key_id="x",
use_ssl=False,
)

# And I've stubbed out all the relevant API calls
stubber = Stubber(broker.sqs.meta.client)
stubber.add_response("create_queue", {"QueueUrl": ""})
stubber.add_response("create_queue", {"QueueUrl": ""})
stubber.add_response("get_queue_attributes", {"Attributes": {"QueueArn": "dlq"}})
stubber.add_response("set_queue_attributes", {}, {
"QueueUrl": "",
"Attributes": {
"RedrivePolicy": json.dumps({
"deadLetterTargetArn": "dlq",
"maxReceiveCount": "20"
})
}
})
stubber.add_response(
"set_queue_attributes",
{},
{
"QueueUrl": "",
"Attributes": {
"RedrivePolicy": json.dumps(
{
"deadLetterTargetArn": "dlq",
"maxReceiveCount": "20",
}
)
},
},
)

# When I create a queue
# Then a dead-letter queue should be created
Expand All @@ -164,19 +175,22 @@ def test_tags_queues_on_create():
# Given that I have an SQS broker with tags
broker = SQSBroker(
namespace="dramatiq_sqs_tests",
tags={"key1": "value1", "key2": "value2"}
tags={"key1": "value1", "key2": "value2"},
endpoint_url="http://127.0.0.1:9324",
region_name="elasticmq",
aws_secret_access_key="x",
aws_access_key_id="x",
use_ssl=False,
)

# And I've stubbed out all the relevant API calls
stubber = Stubber(broker.sqs.meta.client)
stubber.add_response("create_queue", {"QueueUrl": ""})
stubber.add_response("tag_queue", {}, {
"QueueUrl": "",
"Tags": {
"key1": "value1",
"key2": "value2"
}
})
stubber.add_response(
"tag_queue",
{},
{"QueueUrl": "", "Tags": {"key1": "value1", "key2": "value2"}},
)

# When I create a queue
# Then the queue should have the specified tags
Expand Down
2 changes: 1 addition & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ def test_chunk_can_split_iterators_into_chunks():
[6, 7],
[8, 9],
[10, 11],
[12]
[12],
]