Skip to content

Commit

Permalink
Replace Black with Ruff (#481)
Browse files Browse the repository at this point in the history
* Remove [tool.isort] block from pyproject.toml

We don't use it (for now) so removing this confusing block.

* Replace Black with Ruff

* Starting with the Pyflakes linter
* No actual linting and formatting was run just yet

* Fix linting errors and reformat with Ruff

* Integrate flake8-bandit linter via Ruff

* Integrate flake8-self linter via Ruff

Although, silencing this linter in tests, there are too many usages.

* Run Ruff via pre-commit on Github

* Remove mention about Black in CONTRIBUTING.md

* Lint after merging Sources API

* Lint after merging Branching
  • Loading branch information
gwaramadze authored Sep 27, 2024
1 parent 232385d commit c966cb5
Show file tree
Hide file tree
Showing 63 changed files with 282 additions and 210 deletions.
8 changes: 1 addition & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,7 @@ jobs:
uses: actions/setup-python@v4
with:
python-version: 3.9
- name: Install linters
run: |
python -m pip install -U -r requirements-dev.txt
- name: Run Black
run: |
python -m black --check quixstreams
python -m black --check tests
- uses: pre-commit/[email protected]

test:
needs: lint
Expand Down
11 changes: 4 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
repos:
- repo: https://github.com/psf/black
rev: '24.3.0'
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.3
hooks:
- id: black
types: [python]
language_version: python3
args: [ --config=pyproject.toml, --check ]
verbose: true
- id: ruff
- id: ruff-format
3 changes: 1 addition & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,4 @@ This guide will assume you're using Visual Studio Code, but most of the guide is
3. Ensure you don't have `quixstreams` installed using `python3 -m pip uninstall quixstreams` to avoid using that when testing
4. By executing `python3 -m pip install --editable .` you will be able to use the source code as a module for local testing without having to install new versions
5. Add commit pre-hook: `pre-commit install`
6. Configure Black code formatter.
7. Run test from project root with `pytest`
6. Run test from project root with `pytest`
1 change: 0 additions & 1 deletion examples/custom_websocket_source/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def run(self) -> None:


def main():

# Initialize an Application with Kafka configuration
app = Application(
broker_address="localhost:9092", # Specify your Kafka broker address here
Expand Down
29 changes: 19 additions & 10 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,26 @@ version = {attr = "quixstreams.__version__"}
[tool.pydantic-pycharm-plugin]
ignore-init-method-arguments = true

[tool.black]
line-length = 88
target-version = ['py38']
include = '\.pyi?$'
force-exclude = '.*_pb2\.py$'

[tool.isort]
atomic = true
profile = "black"
line_length = 88
[tool.ruff]
fix = true
lint.select = [
"F", # Pyflakes
"S", # flake8-bandit
"SLF", # flake8-self
]
exclude = [
"*_pb2.py", # Protobuf files
]

[tool.ruff.lint.per-file-ignores]
# S101 Use of `assert` detected
# S105 Possible hardcoded password assigned to variable
# S106 Possible hardcoded password assigned to argument
# S311 Standard pseudo-random generators are not suitable for cryptographic purposes
# SLF001 Private member accessed
"docs/*.py" = ["S311"]
"examples/*.py" = ["S311"]
"tests/*.py" = ["S101", "S105", "S106", "S311", "SLF001"]

[tool.pytest.ini_options]
minversion = "6.0"
Expand Down
1 change: 1 addition & 0 deletions quixstreams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
from .models import MessageContext
from .state import State

__all__ = ["Application", "message_context", "MessageContext", "State"]

__version__ = "2.11.1"
10 changes: 5 additions & 5 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
# sometimes "empty" calls happen, probably updating the consumer epoch
if not topic_partitions:
return
logger.debug(f"Rebalancing: assigning partitions")
logger.debug("Rebalancing: assigning partitions")

# Only start the sources once the consumer is assigned. Otherwise a source
# can produce data before the consumer starts. If that happens on a new
Expand Down Expand Up @@ -903,7 +903,7 @@ def _on_revoke(self, _, topic_partitions: List[TopicPartition]):
# because of the unhandled exception.
# In this case, we should drop the checkpoint and let another consumer
# pick up from the latest one
logger.debug(f"Rebalancing: revoking partitions")
logger.debug("Rebalancing: revoking partitions")
if self._failed:
logger.warning(
"Application is stopping due to failure, "
Expand All @@ -926,7 +926,7 @@ def _on_lost(self, _, topic_partitions: List[TopicPartition]):
"""
Dropping lost partitions from consumer and state
"""
logger.debug(f"Rebalancing: dropping lost partitions")
logger.debug("Rebalancing: dropping lost partitions")
for tp in topic_partitions:
if self._state_manager.stores:
self._state_manager.on_partition_revoke(
Expand All @@ -944,11 +944,11 @@ def _on_sigint(self, *_):
# Re-install the default SIGINT handler so doing Ctrl+C twice
# raises KeyboardInterrupt
signal.signal(signal.SIGINT, signal.default_int_handler)
logger.debug(f"Received SIGINT, stopping the processing loop")
logger.debug("Received SIGINT, stopping the processing loop")
self.stop()

def _on_sigterm(self, *_):
logger.debug(f"Received SIGTERM, stopping the processing loop")
logger.debug("Received SIGTERM, stopping the processing loop")
self.stop()


Expand Down
2 changes: 2 additions & 0 deletions quixstreams/checkpointing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
from .checkpoint import Checkpoint, BaseCheckpoint
from .exceptions import InvalidStoredOffset

__all__ = ["BaseCheckpoint", "Checkpoint", "InvalidStoredOffset"]
1 change: 1 addition & 0 deletions quixstreams/core/stream/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# ruff: noqa: F403
from .stream import *
from .functions import *
3 changes: 2 additions & 1 deletion quixstreams/core/stream/functions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# ruff: noqa: F403
from .types import *
from .base import StreamFunction
from .base import *
from .apply import *
from .update import *
from .filter import *
Expand Down
2 changes: 1 addition & 1 deletion quixstreams/core/stream/functions/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ def pickle_copier(obj: T) -> Callable[[], T]:
"""

serialized = dumps(obj, protocol=_PICKLE_PROTOCOL, fix_imports=False)
return lambda: loads(serialized, fix_imports=False)
return lambda: loads(serialized, fix_imports=False) # noqa: S301
1 change: 0 additions & 1 deletion quixstreams/core/stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ def _compose(
allow_expands: bool,
allow_transforms: bool,
) -> VoidExecutor:

functions = [node.func for node in tree]

# Iterate over a reversed list of functions
Expand Down
23 changes: 21 additions & 2 deletions quixstreams/dataframe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,23 @@
from .dataframe import StreamingDataFrame
from .exceptions import *
from .exceptions import (
InvalidOperation,
GroupByNestingLimit,
InvalidColumnReference,
ColumnDoesNotExist,
StreamingDataFrameDuplicate,
GroupByDuplicate,
)
from .registry import DataframeRegistry
from .series import *
from .series import StreamingSeries

__all__ = (
"ColumnDoesNotExist",
"DataframeRegistry",
"GroupByDuplicate",
"GroupByNestingLimit",
"InvalidColumnReference",
"InvalidOperation",
"StreamingDataFrame",
"StreamingDataFrameDuplicate",
"StreamingSeries",
)
5 changes: 2 additions & 3 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
from quixstreams.processing import ProcessingContext
from quixstreams.sinks import BaseSink
from quixstreams.state.types import State
from quixstreams.sources import BaseSource
from .base import BaseStreaming
from .exceptions import InvalidOperation
from .registry import DataframeRegistry
Expand Down Expand Up @@ -983,7 +982,7 @@ def drop(
if not columns:
return self
if not all(isinstance(s, str) for s in columns):
raise TypeError(f"column list must contain strings only")
raise TypeError("column list must contain strings only")
elif isinstance(columns, str):
columns = [columns]
else:
Expand Down Expand Up @@ -1189,7 +1188,7 @@ def _drop(value: Dict, columns: List[str], ignore_missing: bool = False):


def _as_metadata_func(
func: Union[ApplyCallbackStateful, FilterCallbackStateful, UpdateCallbackStateful]
func: Union[ApplyCallbackStateful, FilterCallbackStateful, UpdateCallbackStateful],
) -> Union[
ApplyWithMetadataCallbackStateful,
FilterWithMetadataCallbackStateful,
Expand Down
6 changes: 6 additions & 0 deletions quixstreams/dataframe/windows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
from .definitions import HoppingWindowDefinition, TumblingWindowDefinition
from .base import WindowResult

__all__ = [
"HoppingWindowDefinition",
"TumblingWindowDefinition",
"WindowResult",
]
1 change: 1 addition & 0 deletions quixstreams/exceptions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# ruff: noqa: F403
from .base import *
from .assignment import *
1 change: 1 addition & 0 deletions quixstreams/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# ruff: noqa: F403
from .producer import *
from .consumer import *
from .configuration import *
4 changes: 2 additions & 2 deletions quixstreams/kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
IGNORED_KAFKA_ERRORS = (
# This error seems to be thrown despite brokers being available.
# Seems linked to `connections.max.idle.ms`.
KafkaError._ALL_BROKERS_DOWN,
KafkaError._ALL_BROKERS_DOWN, # noqa: SLF001
# Broker handle destroyed - common/typical behavior, often seen via AdminClient
KafkaError._DESTROY,
KafkaError._DESTROY, # noqa: SLF001
)


Expand Down
1 change: 1 addition & 0 deletions quixstreams/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# ruff: noqa: F403
from .rows import *
from .serializers import *
from .timestamps import *
Expand Down
1 change: 1 addition & 0 deletions quixstreams/models/serializers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# ruff: noqa: F403
from .base import *
from .simple_types import (
StringSerializer,
Expand Down
1 change: 0 additions & 1 deletion quixstreams/models/serializers/protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def __init__(
# conf dict with a single key: `use.deprecated.format`.
self._schema_registry_deserializer = None
if schema_registry_client_config:

# The use.deprecated.format has been mandatory since Confluent Kafka version 1.8.2.
# https://github.com/confluentinc/confluent-kafka-python/releases/tag/v1.8.2
serialization_config = (
Expand Down
2 changes: 1 addition & 1 deletion quixstreams/models/serializers/simple_types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import functools
from typing import Optional, Union, Mapping
from typing import Union, Mapping

from confluent_kafka.serialization import (
StringDeserializer as _StringDeserializer,
Expand Down
1 change: 1 addition & 0 deletions quixstreams/models/topics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# ruff: noqa: F403
from .admin import *
from .manager import *
from .topic import *
6 changes: 3 additions & 3 deletions quixstreams/models/topics/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def repartition_topic(
:return: `Topic` object (which is also stored on the TopicManager)
"""
name = self._internal_name(f"repartition", topic_name, operation)
name = self._internal_name("repartition", topic_name, operation)

topic = Topic(
name=name,
Expand Down Expand Up @@ -444,7 +444,7 @@ def validate_all_topics(self, timeout: Optional[float] = None):
Issues are pooled and raised as an Exception once inspections are complete.
"""
logger.info(f"Validating Kafka topics exist and are configured correctly...")
logger.info("Validating Kafka topics exist and are configured correctly...")
all_topic_names = [t.name for t in self._all_topics_list]
actual_configs = self._admin.inspect_topics(
all_topic_names,
Expand Down Expand Up @@ -476,4 +476,4 @@ def validate_all_topics(self, timeout: Optional[float] = None):
f'got {changelog_cfg.replication_factor}"'
)

logger.info(f"Kafka topics validation complete")
logger.info("Kafka topics validation complete")
1 change: 1 addition & 0 deletions quixstreams/platforms/quix/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# ruff: noqa: F403
from .api import *
from .checks import *
from .config import *
Expand Down
2 changes: 1 addition & 1 deletion quixstreams/platforms/quix/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def get_workspace_certificate(
return f.read()

def get_auth_token_details(self, timeout: float = 30) -> dict:
return self.session.get(f"/auth/token/details", timeout=timeout).json()
return self.session.get("/auth/token/details", timeout=timeout).json()

def get_workspace(
self, workspace_id: Optional[str] = None, timeout: float = 30
Expand Down
2 changes: 1 addition & 1 deletion quixstreams/platforms/quix/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class QuixEnvironment:
Class to access various Quix platform environment settings
"""

SDK_TOKEN = "Quix__Sdk__Token"
SDK_TOKEN = "Quix__Sdk__Token" # noqa: S105
PORTAL_API = "Quix__Portal__Api"
WORKSPACE_ID = "Quix__Workspace__Id"
DEPLOYMENT_ID = "Quix__Deployment__Id"
Expand Down
2 changes: 1 addition & 1 deletion quixstreams/rowconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def poll_row(self, timeout: float = None) -> Union[Row, List[Row], None]:
if msg is None:
return

topic_name, partition, offset = msg.topic(), msg.partition(), msg.offset()
topic_name = msg.topic()
try:
if msg.error():
raise KafkaConsumerException(error=msg.error())
Expand Down
1 change: 0 additions & 1 deletion quixstreams/rowproducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ def __init__(
flush_timeout: Optional[float] = None,
transactional: bool = False,
):

if transactional:
self._producer = TransactionalProducer(
broker_address=broker_address,
Expand Down
12 changes: 10 additions & 2 deletions quixstreams/sinks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from .base import BatchingSink, SinkBatch, BaseSink
from .exceptions import SinkBackpressureError as SinkBackpressureError
from .manager import SinkManager as SinkManager
from .exceptions import SinkBackpressureError
from .manager import SinkManager

__all__ = [
"BaseSink",
"BatchingSink",
"SinkBackpressureError",
"SinkBatch",
"SinkManager",
]
1 change: 0 additions & 1 deletion quixstreams/sinks/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@


class CSVSink(BatchingSink):

def __init__(
self,
path: str,
Expand Down
12 changes: 11 additions & 1 deletion quixstreams/sources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
from .base import *
from .base import BaseSource, Source
from .manager import SourceException
from .multiprocessing import multiprocessing
from .csv import CSVSource
from .kafka import KafkaReplicatorSource, QuixEnvironmentSource

__all__ = [
"BaseSource",
"CSVSource",
"KafkaReplicatorSource",
"multiprocessing",
"QuixEnvironmentSource",
"Source",
"SourceException",
]
9 changes: 7 additions & 2 deletions quixstreams/sources/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
from .kafka import *
from .quix import *
from .kafka import KafkaReplicatorSource
from .quix import QuixEnvironmentSource

__all__ = [
"KafkaReplicatorSource",
"QuixEnvironmentSource",
]
2 changes: 1 addition & 1 deletion quixstreams/sources/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def run(self) -> None:
try:
self.source.start()
except BaseException as err:
logger.exception(f"Error in source")
logger.exception("Error in source")
self._report_exception(err)

logger.info("Source completed")
Expand Down
1 change: 1 addition & 0 deletions quixstreams/state/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# ruff: noqa: F403
from .manager import *
from .recovery import *
from .types import *
Loading

0 comments on commit c966cb5

Please sign in to comment.