diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0a8a4e894..22ad3e1a5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,13 +25,7 @@ jobs: uses: actions/setup-python@v4 with: python-version: 3.9 - - name: Install linters - run: | - python -m pip install -U -r requirements-dev.txt - - name: Run Black - run: | - python -m black --check quixstreams - python -m black --check tests + - uses: pre-commit/action@v3.0.1 test: needs: lint diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1928e1200..296910d8e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,9 +1,6 @@ repos: - - repo: https://github.com/psf/black - rev: '24.3.0' + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.6.3 hooks: - - id: black - types: [python] - language_version: python3 - args: [ --config=pyproject.toml, --check ] - verbose: true + - id: ruff + - id: ruff-format diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d13e11f32..b5e9f3449 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -54,5 +54,4 @@ This guide will assume you're using Visual Studio Code, but most of the guide is 3. Ensure you don't have `quixstreams` installed using `python3 -m pip uninstall quixstreams` to avoid using that when testing 4. By executing `python3 -m pip install --editable .` you will be able to use the source code as a module for local testing without having to install new versions 5. Add commit pre-hook: `pre-commit install` -6. Configure Black code formatter. -7. Run test from project root with `pytest` \ No newline at end of file +6. Run test from project root with `pytest` diff --git a/examples/custom_websocket_source/main.py b/examples/custom_websocket_source/main.py index abfc6f26b..b99b76392 100644 --- a/examples/custom_websocket_source/main.py +++ b/examples/custom_websocket_source/main.py @@ -84,7 +84,6 @@ def run(self) -> None: def main(): - # Initialize an Application with Kafka configuration app = Application( broker_address="localhost:9092", # Specify your Kafka broker address here diff --git a/pyproject.toml b/pyproject.toml index c376869ff..bb5df444d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,17 +49,26 @@ version = {attr = "quixstreams.__version__"} [tool.pydantic-pycharm-plugin] ignore-init-method-arguments = true -[tool.black] -line-length = 88 -target-version = ['py38'] -include = '\.pyi?$' -force-exclude = '.*_pb2\.py$' - -[tool.isort] -atomic = true -profile = "black" -line_length = 88 +[tool.ruff] +fix = true +lint.select = [ + "F", # Pyflakes + "S", # flake8-bandit + "SLF", # flake8-self +] +exclude = [ + "*_pb2.py", # Protobuf files +] +[tool.ruff.lint.per-file-ignores] +# S101 Use of `assert` detected +# S105 Possible hardcoded password assigned to variable +# S106 Possible hardcoded password assigned to argument +# S311 Standard pseudo-random generators are not suitable for cryptographic purposes +# SLF001 Private member accessed +"docs/*.py" = ["S311"] +"examples/*.py" = ["S311"] +"tests/*.py" = ["S101", "S105", "S106", "S311", "SLF001"] [tool.pytest.ini_options] minversion = "6.0" diff --git a/quixstreams/__init__.py b/quixstreams/__init__.py index b1b40cdf0..971fd15aa 100644 --- a/quixstreams/__init__.py +++ b/quixstreams/__init__.py @@ -3,5 +3,6 @@ from .models import MessageContext from .state import State +__all__ = ["Application", "message_context", "MessageContext", "State"] __version__ = "2.11.1" diff --git a/quixstreams/app.py b/quixstreams/app.py index 77df77ce7..7a4cc3b17 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -845,7 +845,7 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]): # sometimes "empty" calls happen, probably updating the consumer epoch if not topic_partitions: return - logger.debug(f"Rebalancing: assigning partitions") + logger.debug("Rebalancing: assigning partitions") # Only start the sources once the consumer is assigned. Otherwise a source # can produce data before the consumer starts. If that happens on a new @@ -903,7 +903,7 @@ def _on_revoke(self, _, topic_partitions: List[TopicPartition]): # because of the unhandled exception. # In this case, we should drop the checkpoint and let another consumer # pick up from the latest one - logger.debug(f"Rebalancing: revoking partitions") + logger.debug("Rebalancing: revoking partitions") if self._failed: logger.warning( "Application is stopping due to failure, " @@ -926,7 +926,7 @@ def _on_lost(self, _, topic_partitions: List[TopicPartition]): """ Dropping lost partitions from consumer and state """ - logger.debug(f"Rebalancing: dropping lost partitions") + logger.debug("Rebalancing: dropping lost partitions") for tp in topic_partitions: if self._state_manager.stores: self._state_manager.on_partition_revoke( @@ -944,11 +944,11 @@ def _on_sigint(self, *_): # Re-install the default SIGINT handler so doing Ctrl+C twice # raises KeyboardInterrupt signal.signal(signal.SIGINT, signal.default_int_handler) - logger.debug(f"Received SIGINT, stopping the processing loop") + logger.debug("Received SIGINT, stopping the processing loop") self.stop() def _on_sigterm(self, *_): - logger.debug(f"Received SIGTERM, stopping the processing loop") + logger.debug("Received SIGTERM, stopping the processing loop") self.stop() diff --git a/quixstreams/checkpointing/__init__.py b/quixstreams/checkpointing/__init__.py index 80457dd5c..0f7c57e52 100644 --- a/quixstreams/checkpointing/__init__.py +++ b/quixstreams/checkpointing/__init__.py @@ -1,2 +1,4 @@ from .checkpoint import Checkpoint, BaseCheckpoint from .exceptions import InvalidStoredOffset + +__all__ = ["BaseCheckpoint", "Checkpoint", "InvalidStoredOffset"] diff --git a/quixstreams/core/stream/__init__.py b/quixstreams/core/stream/__init__.py index 0c64aa38d..a0684b8e8 100644 --- a/quixstreams/core/stream/__init__.py +++ b/quixstreams/core/stream/__init__.py @@ -1,2 +1,3 @@ +# ruff: noqa: F403 from .stream import * from .functions import * diff --git a/quixstreams/core/stream/functions/__init__.py b/quixstreams/core/stream/functions/__init__.py index cdb7df52f..8cdc976d2 100644 --- a/quixstreams/core/stream/functions/__init__.py +++ b/quixstreams/core/stream/functions/__init__.py @@ -1,5 +1,6 @@ +# ruff: noqa: F403 from .types import * -from .base import StreamFunction +from .base import * from .apply import * from .update import * from .filter import * diff --git a/quixstreams/core/stream/functions/utils.py b/quixstreams/core/stream/functions/utils.py index 1b8183e89..7d7243f32 100644 --- a/quixstreams/core/stream/functions/utils.py +++ b/quixstreams/core/stream/functions/utils.py @@ -19,4 +19,4 @@ def pickle_copier(obj: T) -> Callable[[], T]: """ serialized = dumps(obj, protocol=_PICKLE_PROTOCOL, fix_imports=False) - return lambda: loads(serialized, fix_imports=False) + return lambda: loads(serialized, fix_imports=False) # noqa: S301 diff --git a/quixstreams/core/stream/stream.py b/quixstreams/core/stream/stream.py index 4e7ea7ed2..a99143047 100644 --- a/quixstreams/core/stream/stream.py +++ b/quixstreams/core/stream/stream.py @@ -400,7 +400,6 @@ def _compose( allow_expands: bool, allow_transforms: bool, ) -> VoidExecutor: - functions = [node.func for node in tree] # Iterate over a reversed list of functions diff --git a/quixstreams/dataframe/__init__.py b/quixstreams/dataframe/__init__.py index c30f60fb1..275c9602d 100644 --- a/quixstreams/dataframe/__init__.py +++ b/quixstreams/dataframe/__init__.py @@ -1,4 +1,23 @@ from .dataframe import StreamingDataFrame -from .exceptions import * +from .exceptions import ( + InvalidOperation, + GroupByNestingLimit, + InvalidColumnReference, + ColumnDoesNotExist, + StreamingDataFrameDuplicate, + GroupByDuplicate, +) from .registry import DataframeRegistry -from .series import * +from .series import StreamingSeries + +__all__ = ( + "ColumnDoesNotExist", + "DataframeRegistry", + "GroupByDuplicate", + "GroupByNestingLimit", + "InvalidColumnReference", + "InvalidOperation", + "StreamingDataFrame", + "StreamingDataFrameDuplicate", + "StreamingSeries", +) diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index fc9b66a03..5f5882862 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -46,7 +46,6 @@ from quixstreams.processing import ProcessingContext from quixstreams.sinks import BaseSink from quixstreams.state.types import State -from quixstreams.sources import BaseSource from .base import BaseStreaming from .exceptions import InvalidOperation from .registry import DataframeRegistry @@ -983,7 +982,7 @@ def drop( if not columns: return self if not all(isinstance(s, str) for s in columns): - raise TypeError(f"column list must contain strings only") + raise TypeError("column list must contain strings only") elif isinstance(columns, str): columns = [columns] else: @@ -1189,7 +1188,7 @@ def _drop(value: Dict, columns: List[str], ignore_missing: bool = False): def _as_metadata_func( - func: Union[ApplyCallbackStateful, FilterCallbackStateful, UpdateCallbackStateful] + func: Union[ApplyCallbackStateful, FilterCallbackStateful, UpdateCallbackStateful], ) -> Union[ ApplyWithMetadataCallbackStateful, FilterWithMetadataCallbackStateful, diff --git a/quixstreams/dataframe/windows/__init__.py b/quixstreams/dataframe/windows/__init__.py index cb5248bd1..5e305327f 100644 --- a/quixstreams/dataframe/windows/__init__.py +++ b/quixstreams/dataframe/windows/__init__.py @@ -1,2 +1,8 @@ from .definitions import HoppingWindowDefinition, TumblingWindowDefinition from .base import WindowResult + +__all__ = [ + "HoppingWindowDefinition", + "TumblingWindowDefinition", + "WindowResult", +] diff --git a/quixstreams/exceptions/__init__.py b/quixstreams/exceptions/__init__.py index a572a9c98..005d305d6 100644 --- a/quixstreams/exceptions/__init__.py +++ b/quixstreams/exceptions/__init__.py @@ -1,2 +1,3 @@ +# ruff: noqa: F403 from .base import * from .assignment import * diff --git a/quixstreams/kafka/__init__.py b/quixstreams/kafka/__init__.py index da5724ea7..067385e4c 100644 --- a/quixstreams/kafka/__init__.py +++ b/quixstreams/kafka/__init__.py @@ -1,3 +1,4 @@ +# ruff: noqa: F403 from .producer import * from .consumer import * from .configuration import * diff --git a/quixstreams/kafka/producer.py b/quixstreams/kafka/producer.py index 79b947c52..971a6d41e 100644 --- a/quixstreams/kafka/producer.py +++ b/quixstreams/kafka/producer.py @@ -22,9 +22,9 @@ IGNORED_KAFKA_ERRORS = ( # This error seems to be thrown despite brokers being available. # Seems linked to `connections.max.idle.ms`. - KafkaError._ALL_BROKERS_DOWN, + KafkaError._ALL_BROKERS_DOWN, # noqa: SLF001 # Broker handle destroyed - common/typical behavior, often seen via AdminClient - KafkaError._DESTROY, + KafkaError._DESTROY, # noqa: SLF001 ) diff --git a/quixstreams/models/__init__.py b/quixstreams/models/__init__.py index ed2fb1237..b5836cd34 100644 --- a/quixstreams/models/__init__.py +++ b/quixstreams/models/__init__.py @@ -1,3 +1,4 @@ +# ruff: noqa: F403 from .rows import * from .serializers import * from .timestamps import * diff --git a/quixstreams/models/serializers/__init__.py b/quixstreams/models/serializers/__init__.py index c3a8009c6..a06810d47 100644 --- a/quixstreams/models/serializers/__init__.py +++ b/quixstreams/models/serializers/__init__.py @@ -1,3 +1,4 @@ +# ruff: noqa: F403 from .base import * from .simple_types import ( StringSerializer, diff --git a/quixstreams/models/serializers/protobuf.py b/quixstreams/models/serializers/protobuf.py index 521cd523d..0f4e0aceb 100644 --- a/quixstreams/models/serializers/protobuf.py +++ b/quixstreams/models/serializers/protobuf.py @@ -154,7 +154,6 @@ def __init__( # conf dict with a single key: `use.deprecated.format`. self._schema_registry_deserializer = None if schema_registry_client_config: - # The use.deprecated.format has been mandatory since Confluent Kafka version 1.8.2. # https://github.com/confluentinc/confluent-kafka-python/releases/tag/v1.8.2 serialization_config = ( diff --git a/quixstreams/models/serializers/simple_types.py b/quixstreams/models/serializers/simple_types.py index c63ad5f3e..64e9a93a2 100644 --- a/quixstreams/models/serializers/simple_types.py +++ b/quixstreams/models/serializers/simple_types.py @@ -1,5 +1,5 @@ import functools -from typing import Optional, Union, Mapping +from typing import Union, Mapping from confluent_kafka.serialization import ( StringDeserializer as _StringDeserializer, diff --git a/quixstreams/models/topics/__init__.py b/quixstreams/models/topics/__init__.py index aa1ddb591..2a8dcc9a5 100644 --- a/quixstreams/models/topics/__init__.py +++ b/quixstreams/models/topics/__init__.py @@ -1,3 +1,4 @@ +# ruff: noqa: F403 from .admin import * from .manager import * from .topic import * diff --git a/quixstreams/models/topics/manager.py b/quixstreams/models/topics/manager.py index 6f781b917..9eeb6950f 100644 --- a/quixstreams/models/topics/manager.py +++ b/quixstreams/models/topics/manager.py @@ -323,7 +323,7 @@ def repartition_topic( :return: `Topic` object (which is also stored on the TopicManager) """ - name = self._internal_name(f"repartition", topic_name, operation) + name = self._internal_name("repartition", topic_name, operation) topic = Topic( name=name, @@ -444,7 +444,7 @@ def validate_all_topics(self, timeout: Optional[float] = None): Issues are pooled and raised as an Exception once inspections are complete. """ - logger.info(f"Validating Kafka topics exist and are configured correctly...") + logger.info("Validating Kafka topics exist and are configured correctly...") all_topic_names = [t.name for t in self._all_topics_list] actual_configs = self._admin.inspect_topics( all_topic_names, @@ -476,4 +476,4 @@ def validate_all_topics(self, timeout: Optional[float] = None): f'got {changelog_cfg.replication_factor}"' ) - logger.info(f"Kafka topics validation complete") + logger.info("Kafka topics validation complete") diff --git a/quixstreams/platforms/quix/__init__.py b/quixstreams/platforms/quix/__init__.py index ad609b2d7..7f47c2291 100644 --- a/quixstreams/platforms/quix/__init__.py +++ b/quixstreams/platforms/quix/__init__.py @@ -1,3 +1,4 @@ +# ruff: noqa: F403 from .api import * from .checks import * from .config import * diff --git a/quixstreams/platforms/quix/api.py b/quixstreams/platforms/quix/api.py index aa0460f9b..53e4400bc 100644 --- a/quixstreams/platforms/quix/api.py +++ b/quixstreams/platforms/quix/api.py @@ -140,7 +140,7 @@ def get_workspace_certificate( return f.read() def get_auth_token_details(self, timeout: float = 30) -> dict: - return self.session.get(f"/auth/token/details", timeout=timeout).json() + return self.session.get("/auth/token/details", timeout=timeout).json() def get_workspace( self, workspace_id: Optional[str] = None, timeout: float = 30 diff --git a/quixstreams/platforms/quix/env.py b/quixstreams/platforms/quix/env.py index 28d96a70b..7c292a2dd 100644 --- a/quixstreams/platforms/quix/env.py +++ b/quixstreams/platforms/quix/env.py @@ -9,7 +9,7 @@ class QuixEnvironment: Class to access various Quix platform environment settings """ - SDK_TOKEN = "Quix__Sdk__Token" + SDK_TOKEN = "Quix__Sdk__Token" # noqa: S105 PORTAL_API = "Quix__Portal__Api" WORKSPACE_ID = "Quix__Workspace__Id" DEPLOYMENT_ID = "Quix__Deployment__Id" diff --git a/quixstreams/rowconsumer.py b/quixstreams/rowconsumer.py index 1eff11fbf..084f47340 100644 --- a/quixstreams/rowconsumer.py +++ b/quixstreams/rowconsumer.py @@ -128,7 +128,7 @@ def poll_row(self, timeout: float = None) -> Union[Row, List[Row], None]: if msg is None: return - topic_name, partition, offset = msg.topic(), msg.partition(), msg.offset() + topic_name = msg.topic() try: if msg.error(): raise KafkaConsumerException(error=msg.error()) diff --git a/quixstreams/rowproducer.py b/quixstreams/rowproducer.py index 16b679027..911c6d840 100644 --- a/quixstreams/rowproducer.py +++ b/quixstreams/rowproducer.py @@ -97,7 +97,6 @@ def __init__( flush_timeout: Optional[float] = None, transactional: bool = False, ): - if transactional: self._producer = TransactionalProducer( broker_address=broker_address, diff --git a/quixstreams/sinks/__init__.py b/quixstreams/sinks/__init__.py index d9ad98793..b88f208c4 100644 --- a/quixstreams/sinks/__init__.py +++ b/quixstreams/sinks/__init__.py @@ -1,3 +1,11 @@ from .base import BatchingSink, SinkBatch, BaseSink -from .exceptions import SinkBackpressureError as SinkBackpressureError -from .manager import SinkManager as SinkManager +from .exceptions import SinkBackpressureError +from .manager import SinkManager + +__all__ = [ + "BaseSink", + "BatchingSink", + "SinkBackpressureError", + "SinkBatch", + "SinkManager", +] diff --git a/quixstreams/sinks/csv.py b/quixstreams/sinks/csv.py index c1bac1981..54300185b 100644 --- a/quixstreams/sinks/csv.py +++ b/quixstreams/sinks/csv.py @@ -7,7 +7,6 @@ class CSVSink(BatchingSink): - def __init__( self, path: str, diff --git a/quixstreams/sources/__init__.py b/quixstreams/sources/__init__.py index d32b5d47d..2c7bc7e3c 100644 --- a/quixstreams/sources/__init__.py +++ b/quixstreams/sources/__init__.py @@ -1,5 +1,15 @@ -from .base import * +from .base import BaseSource, Source from .manager import SourceException from .multiprocessing import multiprocessing from .csv import CSVSource from .kafka import KafkaReplicatorSource, QuixEnvironmentSource + +__all__ = [ + "BaseSource", + "CSVSource", + "KafkaReplicatorSource", + "multiprocessing", + "QuixEnvironmentSource", + "Source", + "SourceException", +] diff --git a/quixstreams/sources/kafka/__init__.py b/quixstreams/sources/kafka/__init__.py index c168c6ae8..01b3d3148 100644 --- a/quixstreams/sources/kafka/__init__.py +++ b/quixstreams/sources/kafka/__init__.py @@ -1,2 +1,7 @@ -from .kafka import * -from .quix import * +from .kafka import KafkaReplicatorSource +from .quix import QuixEnvironmentSource + +__all__ = [ + "KafkaReplicatorSource", + "QuixEnvironmentSource", +] diff --git a/quixstreams/sources/manager.py b/quixstreams/sources/manager.py index ba29b061c..0c0332479 100644 --- a/quixstreams/sources/manager.py +++ b/quixstreams/sources/manager.py @@ -67,7 +67,7 @@ def run(self) -> None: try: self.source.start() except BaseException as err: - logger.exception(f"Error in source") + logger.exception("Error in source") self._report_exception(err) logger.info("Source completed") diff --git a/quixstreams/state/__init__.py b/quixstreams/state/__init__.py index d144aa118..6618dcb01 100644 --- a/quixstreams/state/__init__.py +++ b/quixstreams/state/__init__.py @@ -1,3 +1,4 @@ +# ruff: noqa: F403 from .manager import * from .recovery import * from .types import * diff --git a/quixstreams/state/rocksdb/__init__.py b/quixstreams/state/rocksdb/__init__.py index 1d2f937ac..7e4416821 100644 --- a/quixstreams/state/rocksdb/__init__.py +++ b/quixstreams/state/rocksdb/__init__.py @@ -1,3 +1,4 @@ +# ruff: noqa: F403 from .exceptions import * from .options import * from .partition import * diff --git a/quixstreams/state/rocksdb/store.py b/quixstreams/state/rocksdb/store.py index 194bf680f..62a36ff57 100644 --- a/quixstreams/state/rocksdb/store.py +++ b/quixstreams/state/rocksdb/store.py @@ -107,7 +107,7 @@ def assign_partition(self, partition: int) -> RocksDBStorePartition: self._partitions[partition] = store_partition logger.debug( - f'Assigned store partition "%s[%s]" (topic "%s")', + 'Assigned store partition "%s[%s]" (topic "%s")', self._name, partition, self._topic, diff --git a/quixstreams/state/rocksdb/transaction.py b/quixstreams/state/rocksdb/transaction.py index 12c99753e..935ad6183 100644 --- a/quixstreams/state/rocksdb/transaction.py +++ b/quixstreams/state/rocksdb/transaction.py @@ -431,7 +431,7 @@ def _flush_state( and changelog_offset < current_changelog_offset ): raise InvalidChangelogOffset( - f"Cannot set changelog offset lower than already saved one" + "Cannot set changelog offset lower than already saved one" ) self._batch.put( CHANGELOG_OFFSET_KEY, diff --git a/quixstreams/state/rocksdb/windowed/serialization.py b/quixstreams/state/rocksdb/windowed/serialization.py index 791a6a8e8..610bac4cc 100644 --- a/quixstreams/state/rocksdb/windowed/serialization.py +++ b/quixstreams/state/rocksdb/windowed/serialization.py @@ -30,8 +30,9 @@ def parse_window_key(key: bytes) -> Tuple[bytes, int, int]: timestamps_bytes[_TIMESTAMP_BYTE_LENGTH + 1 :], ) - start_ms, end_ms = int_from_int64_bytes(start_bytes), int_from_int64_bytes( - end_bytes + start_ms, end_ms = ( + int_from_int64_bytes(start_bytes), + int_from_int64_bytes(end_bytes), ) return message_key, start_ms, end_ms diff --git a/quixstreams/state/types.py b/quixstreams/state/types.py index 83d5232ab..00621ae83 100644 --- a/quixstreams/state/types.py +++ b/quixstreams/state/types.py @@ -2,7 +2,6 @@ from typing import Protocol, Any, Optional, Callable, Dict, ClassVar, Tuple, List from quixstreams.models import ConfluentKafkaMessageProto -from quixstreams.models.types import MessageHeadersMapping DumpsFunc = Callable[[Any], bytes] LoadsFunc = Callable[[bytes], Any] @@ -365,7 +364,6 @@ def expire_windows( class WindowedPartitionTransaction(Protocol): - @property def failed(self) -> bool: """ diff --git a/quixstreams/utils/settings.py b/quixstreams/utils/settings.py index 9f86204d6..d0fec3e26 100644 --- a/quixstreams/utils/settings.py +++ b/quixstreams/utils/settings.py @@ -7,7 +7,6 @@ class BaseSettings(_BaseSettings): - model_config = SettingsConfigDict( alias_generator=AliasGenerator( # used during model_dumps diff --git a/requirements-dev.txt b/requirements-dev.txt index 529e64c1c..b5a27ad56 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,2 +1 @@ -black>=24.3.0, <24.9 pre-commit>=3.4,<3.9 diff --git a/tests/compat/__init__.py b/tests/compat/__init__.py index 428bdd796..ec31acf7b 100644 --- a/tests/compat/__init__.py +++ b/tests/compat/__init__.py @@ -5,3 +5,5 @@ from testcontainers.core.network import Network except ImportError: from .network import Network + +__all__ = ["Network"] diff --git a/tests/test_quixstreams/test_app.py b/tests/test_quixstreams/test_app.py index 2e29f099b..85010582c 100644 --- a/tests/test_quixstreams/test_app.py +++ b/tests/test_quixstreams/test_app.py @@ -494,7 +494,7 @@ def test_consumer_extra_config(self, app_factory): with app.get_consumer() as x: assert x._consumer_config["enable.auto.offset.store"] is True - assert x._consumer_config["auto.offset.reset"] is "latest" + assert x._consumer_config["auto.offset.reset"] == "latest" def test_producer_extra_config(self, app_factory): """ @@ -536,7 +536,6 @@ def test_consumer_group_default(self): class TestAppGroupBy: - def test_group_by( self, app_factory, @@ -714,7 +713,6 @@ def on_message_processed(*_): class TestAppExactlyOnce: - def test_exactly_once( self, app_factory, @@ -855,11 +853,11 @@ def get_cfg_builder(quix_sdk_token): return cfg_builder # Mock consumer and producer to check the init args - with patch("quixstreams.app.QuixKafkaConfigsBuilder", get_cfg_builder), patch( - "quixstreams.app.RowConsumer" - ) as consumer_init_mock, patch( - "quixstreams.app.RowProducer" - ) as producer_init_mock: + with ( + patch("quixstreams.app.QuixKafkaConfigsBuilder", get_cfg_builder), + patch("quixstreams.app.RowConsumer") as consumer_init_mock, + patch("quixstreams.app.RowProducer") as producer_init_mock, + ): app = Application( consumer_group=consumer_group, quix_sdk_token=quix_sdk_token, @@ -919,11 +917,11 @@ def get_cfg_builder(quix_sdk_token): return cfg_builder monkeypatch.setenv("Quix__Sdk__Token", quix_sdk_token) - with patch("quixstreams.app.QuixKafkaConfigsBuilder", get_cfg_builder), patch( - "quixstreams.app.RowConsumer" - ) as consumer_init_mock, patch( - "quixstreams.app.RowProducer" - ) as producer_init_mock: + with ( + patch("quixstreams.app.QuixKafkaConfigsBuilder", get_cfg_builder), + patch("quixstreams.app.RowConsumer") as consumer_init_mock, + patch("quixstreams.app.RowProducer") as producer_init_mock, + ): Application( consumer_group=consumer_group, consumer_extra_config=extra_config, @@ -980,9 +978,10 @@ def get_cfg_builder(quix_sdk_token): ) return cfg_builder - with patch("quixstreams.app.RowConsumer") as consumer_init_mock, patch( - "quixstreams.app.RowProducer" - ) as producer_init_mock: + with ( + patch("quixstreams.app.RowConsumer") as consumer_init_mock, + patch("quixstreams.app.RowProducer") as producer_init_mock, + ): Application( consumer_group=consumer_group, quix_config_builder=get_cfg_builder(quix_sdk_token), @@ -1741,12 +1740,13 @@ def get_app(): return app, sdf, topic def validate_state(): - with state_manager_factory( - group_id=consumer_group, - state_dir=state_dir, - ) as state_manager, consumer_factory( - consumer_group=consumer_group - ) as consumer: + with ( + state_manager_factory( + group_id=consumer_group, + state_dir=state_dir, + ) as state_manager, + consumer_factory(consumer_group=consumer_group) as consumer, + ): committed_offset = consumer.committed( [TopicPartition(topic=topic_name, partition=0)] )[0].offset @@ -1816,7 +1816,6 @@ def test_run_with_sink_success( app_factory, executor, ): - processed_count = 0 total_messages = 3 @@ -1964,7 +1963,6 @@ def test_run_with_sink_branches_success( app_factory, executor, ): - processed_count = 0 total_messages = 3 @@ -2048,7 +2046,6 @@ def on_message_processed(topic_, partition, offset): class TestApplicationSource: - MESSAGES_COUNT = 3 def wait_finished(self, app, event, timeout=15.0): @@ -2062,7 +2059,6 @@ def test_run_with_source_success( app_factory, executor, ): - done = Future() processed_count = 0 @@ -2091,7 +2087,6 @@ def on_message_processed(topic_, partition, offset): assert values == [0, 1, 2] def test_run_source_only(self, app_factory, executor): - done = multiprocessing.Event() topic_name = str(uuid.uuid4()) diff --git a/tests/test_quixstreams/test_app_config.py b/tests/test_quixstreams/test_app_config.py index b45a1f1a3..6f2c2c205 100644 --- a/tests/test_quixstreams/test_app_config.py +++ b/tests/test_quixstreams/test_app_config.py @@ -3,7 +3,6 @@ class TestApplicationConfig: - def test_frozen(self, app_factory): app = app_factory() diff --git a/tests/test_quixstreams/test_checkpointing.py b/tests/test_quixstreams/test_checkpointing.py index 6b4ac81c3..1b33f8610 100644 --- a/tests/test_quixstreams/test_checkpointing.py +++ b/tests/test_quixstreams/test_checkpointing.py @@ -19,7 +19,6 @@ from quixstreams.state.exceptions import StoreNotRegisteredError, StoreTransactionFailed from quixstreams.state.rocksdb import RocksDBPartitionTransaction from tests.utils import DummySink -from quixstreams.sources.manager import SourceManager @pytest.fixture() @@ -307,10 +306,13 @@ def test_commit_has_failed_transactions_fails( # Simulate a failed transaction tx = checkpoint.get_store_transaction("topic", 0) - with contextlib.suppress(ValueError), patch.object( - RocksDBPartitionTransaction, - "_serialize_key", - side_effect=ValueError("test"), + with ( + contextlib.suppress(ValueError), + patch.object( + RocksDBPartitionTransaction, + "_serialize_key", + side_effect=ValueError("test"), + ), ): tx.set(key=key, value=value, prefix=prefix) assert tx.failed @@ -423,7 +425,6 @@ def test_get_store_transaction_success(self, checkpoint_factory, state_manager): def test_incomplete_flush( self, checkpoint_factory, consumer, state_manager_factory, rowproducer_mock ): - state_manager = state_manager_factory(producer=rowproducer_mock) checkpoint = checkpoint_factory( consumer_=consumer, state_manager_=state_manager, producer_=rowproducer_mock diff --git a/tests/test_quixstreams/test_core/test_stream.py b/tests/test_quixstreams/test_core/test_stream.py index 26b4fbda9..7f6c86810 100644 --- a/tests/test_quixstreams/test_core/test_stream.py +++ b/tests/test_quixstreams/test_core/test_stream.py @@ -89,7 +89,7 @@ def test_diff_shared_origin_with_additional_split_fails(self): stream = Stream() stream = stream.add_apply(lambda v: v) stream2 = stream.add_apply(lambda v: v) - stream3 = stream2.add_apply(lambda v: v) + stream3 = stream2.add_apply(lambda v: v) # noqa: F841 stream2 = stream2.add_apply(lambda v: v) with pytest.raises(InvalidOperation): @@ -187,7 +187,7 @@ def test_apply_expand_filter_some_filtered(self): .add_apply(lambda v: [v, v], expand=True) ) result = Sink() - value, key, timestamp, headers = 1, "key", 1, None + key, timestamp, headers = "key", 1, None stream.compose(sink=result.append_record)(1, key, timestamp, headers) assert result == [(2, key, timestamp, headers), (2, key, timestamp, headers)] @@ -355,7 +355,6 @@ def test_transform_record_expanded(self): class TestStreamBranching: - def test_basic_branching(self): calls = [] @@ -440,11 +439,15 @@ def test_multiple_branches(self): stream = Stream().add_apply(lambda v: v + 120).add_apply(lambda v: v // 2) # 60 stream_2 = stream.add_apply(lambda v: v // 3) # 20 - stream_3 = stream_2.add_apply(lambda v: v + 10).add_apply(lambda v: v + 3) # 33 - stream_4 = stream_2.add_apply(lambda v: v + 24) # 44 + stream_3 = stream_2.add_apply(lambda v: v + 10).add_apply( # noqa: F841 + lambda v: v + 3 + ) # 33 + stream_4 = stream_2.add_apply(lambda v: v + 24) # 44 # noqa: F841 stream_2 = stream_2.add_apply(lambda v: v + 2) # 22 stream = stream.add_apply(lambda v: v + 40) # 100 - stream_5 = stream.add_apply(lambda v: v // 2).add_apply(lambda v: v + 5) # 55 + stream_5 = stream.add_apply(lambda v: v // 2).add_apply( # noqa: F841 + lambda v: v + 5 + ) # 55 stream = stream.add_apply(lambda v: v // 100).add_apply(lambda v: v + 10) # 11 sink = Sink() extras = ("key", 0, []) @@ -463,13 +466,13 @@ def test_filter(self): stream = Stream().add_apply(lambda v: v + 10) stream2 = stream.add_apply(lambda v: v + 5).add_filter(lambda v: v < 0) stream2 = stream2.add_apply(lambda v: v + 200) - stream3 = ( + stream3 = ( # noqa: F841 stream.add_apply(lambda v: v + 7) .add_filter(lambda v: v < 20) .add_apply(lambda v: v + 4) ) stream = stream.add_apply(lambda v: v + 30).add_filter(lambda v: v < 50) - stream4 = stream.add_apply(lambda v: v + 60) + stream4 = stream.add_apply(lambda v: v + 60) # noqa: F841 stream.add_apply(lambda v: v + 800) sink = Sink() @@ -480,12 +483,11 @@ def test_filter(self): assert sink == expected def test_update(self): - stream = Stream().add_apply(lambda v: v + [10]) - stream2 = stream.add_update(lambda v: v.append(5)) + stream2 = stream.add_update(lambda v: v.append(5)) # noqa: F841 stream = stream.add_update(lambda v: v.append(30)).add_apply(lambda v: v + [6]) - stream3 = stream.add_update(lambda v: v.append(100)) - stream4 = stream.add_update(lambda v: v.append(456)) + stream3 = stream.add_update(lambda v: v.append(100)) # noqa: F841 + stream4 = stream.add_update(lambda v: v.append(456)) # noqa: F841 stream = stream.add_apply(lambda v: v + [700]).add_update( lambda v: v.append(222) ) @@ -505,14 +507,14 @@ def test_update(self): def test_expand(self): stream = Stream() - stream_2 = stream.add_apply(lambda v: [i for i in v[0]], expand=True).add_apply( + stream_2 = stream.add_apply(lambda v: [i for i in v[0]], expand=True).add_apply( # noqa: F841 lambda v: v + 22 ) - stream_3 = stream.add_apply(lambda v: [i for i in v[1]], expand=True).add_apply( + stream_3 = stream.add_apply(lambda v: [i for i in v[1]], expand=True).add_apply( # noqa: F841 lambda v: v + 33 ) stream = stream.add_apply(lambda v: [i for i in v[2]], expand=True) - stream_4 = stream.add_apply(lambda v: v + 44) + stream_4 = stream.add_apply(lambda v: v + 44) # noqa: F841 stream = stream.add_apply(lambda v: v + 11) sink = Sink() extras = ("key", 0, []) @@ -522,7 +524,6 @@ def test_expand(self): assert sink == expected def test_transform(self): - def transform(n): def wrapper(value, k, t, h): return value, k + "_" + str(n), t + n, h @@ -530,10 +531,10 @@ def wrapper(value, k, t, h): return wrapper stream = Stream().add_apply(lambda v: v + 1) - stream_2 = stream.add_transform(transform(2)) + stream_2 = stream.add_transform(transform(2)) # noqa: F841 stream = stream.add_transform(transform(3)) - stream_3 = stream.add_apply(lambda v: v + 30).add_transform(transform(4)) - stream_4 = stream.add_apply(lambda v: v + 40).add_transform(transform(5)) + stream_3 = stream.add_apply(lambda v: v + 30).add_transform(transform(4)) # noqa: F841 + stream_4 = stream.add_apply(lambda v: v + 40).add_transform(transform(5)) # noqa: F841 stream = stream.add_apply(lambda v: v + 100).add_transform(transform(6)) sink = Sink() diff --git a/tests/test_quixstreams/test_dataframe/test_dataframe.py b/tests/test_quixstreams/test_dataframe/test_dataframe.py index f2006da0e..19342ba04 100644 --- a/tests/test_quixstreams/test_dataframe/test_dataframe.py +++ b/tests/test_quixstreams/test_dataframe/test_dataframe.py @@ -826,7 +826,6 @@ def stateful_func(value_: dict, state: State): class TestStreamingDataFrameTumblingWindow: - def test_tumbling_window_define_from_milliseconds( self, dataframe_factory, state_manager ): @@ -1687,7 +1686,6 @@ def wrapper(value): class TestStreamingDataFrameBranching: def test_basic_branching(self, dataframe_factory): - sdf = dataframe_factory().apply(lambda v: v + 1) sdf.apply(lambda v: v + 2) sdf.apply(lambda v: v + 3) @@ -1718,11 +1716,11 @@ def test_multiple_branches(self, dataframe_factory): sdf = dataframe_factory().apply(add_n(120)).apply(div_n(2)) # 60 sdf_2 = sdf.apply(div_n(3)) # 20 - sdf_3 = sdf_2.apply(add_n(10)).apply(add_n(3)) # 33 - sdf_4 = sdf_2.apply(add_n(24)) # 44 + sdf_3 = sdf_2.apply(add_n(10)).apply(add_n(3)) # 33 # noqa: F841 + sdf_4 = sdf_2.apply(add_n(24)) # 44 # noqa: F841 sdf_2 = sdf_2.apply(add_n(2)) # 22 sdf = sdf.apply(add_n(40)) # 100 - sdf_5 = sdf.apply(div_n(2)).apply(add_n(5)) # 55 + sdf_5 = sdf.apply(div_n(2)).apply(add_n(5)) # 55 # noqa: F841 sdf = sdf.apply(div_n(100)).apply(add_n(10)) # 11 _extras = {"key": b"key", "timestamp": 0, "headers": []} @@ -1775,10 +1773,10 @@ def test_multiple_branches_skip_assigns(self, dataframe_factory): def test_filter(self, dataframe_factory): sdf = dataframe_factory().apply(add_n(10)) - sdf2 = sdf.apply(add_n(5)).filter(less_than(0)).apply(add_n(200)) - sdf3 = sdf.apply(add_n(7)).filter(less_than(20)).apply(add_n(4)) + sdf2 = sdf.apply(add_n(5)).filter(less_than(0)).apply(add_n(200)) # noqa: F841 + sdf3 = sdf.apply(add_n(7)).filter(less_than(20)).apply(add_n(4)) # noqa: F841 sdf = sdf.apply(add_n(30)).filter(less_than(50)) - sdf4 = sdf.apply(add_n(60)) + sdf4 = sdf.apply(add_n(60)) # noqa: F841 sdf.apply(add_n(800)) _extras = {"key": b"key", "timestamp": 0, "headers": []} @@ -1790,12 +1788,11 @@ def test_filter(self, dataframe_factory): assert results == expected def test_filter_using_sdf_apply_and_col_select(self, dataframe_factory): - sdf = dataframe_factory().apply(add_n(10)) - sdf2 = sdf[sdf.apply(less_than(0))].apply(add_n(200)) - sdf3 = sdf[sdf.apply(add_n(8)).apply(less_than(20))].apply(add_n(33)) + sdf2 = sdf[sdf.apply(less_than(0))].apply(add_n(200)) # noqa: F841 + sdf3 = sdf[sdf.apply(add_n(8)).apply(less_than(20))].apply(add_n(33)) # noqa: F841 sdf = sdf[sdf.apply(add_n(30)).apply(less_than(50))].apply(add_n(77)) - sdf4 = sdf.apply(add_n(60)) + sdf4 = sdf.apply(add_n(60)) # noqa: F841 sdf = sdf.apply(add_n(800)) _extras = {"key": b"key", "timestamp": 0, "headers": []} @@ -1807,10 +1804,10 @@ def test_filter_using_sdf_apply_and_col_select(self, dataframe_factory): def test_filter_using_columns(self, dataframe_factory): sdf = dataframe_factory().apply(add_n_df(10)) - sdf2 = sdf[sdf["v"] < 0].apply(add_n_df(200)) - sdf3 = sdf[sdf["v"].apply(add_n(1)).apply(add_n(7)) < 20].apply(add_n_df(33)) + sdf2 = sdf[sdf["v"] < 0].apply(add_n_df(200)) # noqa: F841 + sdf3 = sdf[sdf["v"].apply(add_n(1)).apply(add_n(7)) < 20].apply(add_n_df(33)) # noqa: F841 sdf = sdf[sdf["v"].apply(add_n(5)).apply(add_n(25)) < 50].apply(add_n_df(77)) - sdf4 = sdf.apply(add_n_df(60)) + sdf4 = sdf.apply(add_n_df(60)) # noqa: F841 sdf = sdf.apply(add_n_df(800)) _extras = {"key": b"key", "timestamp": 0, "headers": []} @@ -1834,7 +1831,7 @@ def test_store_series_filter_as_var_and_use(self, dataframe_factory): """ sdf = dataframe_factory().apply(add_n_df(10)) sdf_filter = sdf["v"].apply(add_n(1)).apply(add_n(7)) < 20 # NOT a split - sdf2 = sdf[sdf_filter].apply(add_n_df(33)) + sdf2 = sdf[sdf_filter].apply(add_n_df(33)) # noqa: F841 sdf = sdf.apply(add_n_df(800)) _extras = {"key": b"key", "timestamp": 0, "headers": []} @@ -1858,7 +1855,7 @@ def test_store_series_result_as_var_and_use(self, dataframe_factory): """ sdf = dataframe_factory().apply(add_n_df(10)) sdf_sum = sdf["v"].apply(add_n(1)) + 8 # NOT a split (no data cloning) - sdf2 = sdf[sdf["v"] + sdf_sum < 30].apply(add_n_df(33)) + sdf2 = sdf[sdf["v"] + sdf_sum < 30].apply(add_n_df(33)) # noqa: F841 sdf = sdf.apply(add_n_df(800)) _extras = {"key": b"key", "timestamp": 0, "headers": []} @@ -1909,7 +1906,7 @@ def test_store_sdf_as_var_and_use_in_split(self, dataframe_factory): """ sdf = dataframe_factory().apply(add_n(10)) sdf_filter = sdf.apply(less_than(20)) - sdf2 = sdf[sdf_filter].apply(add_n(33)) + sdf2 = sdf[sdf_filter].apply(add_n(33)) # noqa: F841 sdf = sdf.apply(add_n(800)) _extras = {"key": b"key", "timestamp": 0, "headers": []} @@ -1925,7 +1922,7 @@ def test_reuse_sdf_as_filter_fails(self, dataframe_factory): """ sdf = dataframe_factory().apply(add_n(10)) sdf_filter = sdf.apply(less_than(20)) - sdf2 = sdf[sdf_filter].apply(add_n(100)) + sdf2 = sdf[sdf_filter].apply(add_n(100)) # noqa: F841 with pytest.raises( InvalidOperation, @@ -1985,7 +1982,7 @@ def wrapper(value): return wrapper sdf = dataframe_factory().apply(add_n_df(1)) - sdf2 = sdf.apply(add_n_df(2)).update(mul_n(2)) + sdf2 = sdf.apply(add_n_df(2)).update(mul_n(2)) # noqa: F841 sdf3 = sdf.apply(add_n_df(3)) sdf3.update(mul_n(3)) sdf = sdf.update(mul_n(4)).apply(add_n_df(100)) @@ -2006,8 +2003,8 @@ def set_ts(n): return lambda value, key, timestamp, headers: timestamp + n sdf = dataframe_factory().apply(add_n(1)) - sdf2 = sdf.apply(add_n(2)).set_timestamp(set_ts(3)).set_timestamp(set_ts(5)) - sdf3 = sdf.apply(add_n(3)) + sdf2 = sdf.apply(add_n(2)).set_timestamp(set_ts(3)).set_timestamp(set_ts(5)) # noqa: F841 + sdf3 = sdf.apply(add_n(3)) # noqa: F841 sdf = sdf.set_timestamp(set_ts(4)).apply(add_n(7)) _extras = {"key": b"key", "timestamp": 0, "headers": []} @@ -2075,7 +2072,7 @@ def test_column_setter(self, dataframe_factory): sdf3 = sdf.apply(add_n_df(7)) sdf3["v"] = sdf3.apply(add_n_col(20)) sdf["v"] = sdf.apply(add_n_df(25)).apply(add_n_col(55)) - sdf4 = sdf.apply(add_n_df(100)) + sdf4 = sdf.apply(add_n_df(100)) # noqa: F841 sdf["v"] = sdf.apply(add_n_col(800)) _extras = {"key": b"key", "timestamp": 0, "headers": []} diff --git a/tests/test_quixstreams/test_dataframe/test_series.py b/tests/test_quixstreams/test_dataframe/test_series.py index 113e8ae90..037f70c27 100644 --- a/tests/test_quixstreams/test_dataframe/test_series.py +++ b/tests/test_quixstreams/test_dataframe/test_series.py @@ -460,7 +460,7 @@ def test_abs_success( def test_abs_not_a_number_fails(self): result = StreamingSeries("x").abs() - key, timestamp, headers = "key", 0, [] + key, timestamp = "key", 0 with pytest.raises(TypeError, match="bad operand type for abs()"): assert result.test({"x": "string"}, key, timestamp) diff --git a/tests/test_quixstreams/test_kafka/test_consumer.py b/tests/test_quixstreams/test_kafka/test_consumer.py index 83f9fad9d..f83f9f392 100644 --- a/tests/test_quixstreams/test_kafka/test_consumer.py +++ b/tests/test_quixstreams/test_kafka/test_consumer.py @@ -369,9 +369,10 @@ def test_store_offsets_committed( "value": b"test1", } - with consumer_factory( - auto_offset_reset="earliest" - ) as consumer, producer_factory() as producer: + with ( + consumer_factory(auto_offset_reset="earliest") as consumer, + producer_factory() as producer, + ): consumer.subscribe(topics=[topic]) # Produce a message producer.produce( @@ -388,9 +389,10 @@ def test_store_offsets_committed( consumer.store_offsets(message=msg) # Start new consumer and producer - with consumer_factory( - auto_offset_reset="latest" - ) as consumer, producer_factory() as producer: + with ( + consumer_factory(auto_offset_reset="latest") as consumer, + producer_factory() as producer, + ): consumer.subscribe(topics=[topic]) # Produce a new message producer.produce( @@ -429,9 +431,12 @@ def test_store_offsets_autocommit_disabled( "value": b"test1", } - with consumer_factory( - auto_offset_reset="earliest", auto_commit_enable=False - ) as consumer, producer_factory() as producer: + with ( + consumer_factory( + auto_offset_reset="earliest", auto_commit_enable=False + ) as consumer, + producer_factory() as producer, + ): consumer.subscribe(topics=[topic]) # Produce a message producer.produce( @@ -477,9 +482,12 @@ def test_commit_committed_sync( "value": b"test1", } - with consumer_factory( - auto_offset_reset="earliest", auto_commit_enable=False - ) as consumer, producer_factory() as producer: + with ( + consumer_factory( + auto_offset_reset="earliest", auto_commit_enable=False + ) as consumer, + producer_factory() as producer, + ): consumer.subscribe(topics=[topic]) # Produce a message producer.produce( @@ -512,9 +520,12 @@ def test_commit_committed_async( "value": b"test1", } - with consumer_factory( - auto_offset_reset="earliest", auto_commit_enable=False - ) as consumer, producer_factory() as producer: + with ( + consumer_factory( + auto_offset_reset="earliest", auto_commit_enable=False + ) as consumer, + producer_factory() as producer, + ): consumer.subscribe(topics=[topic]) # Produce a message producer.produce( diff --git a/tests/test_quixstreams/test_models/test_serializers/test_quix_serializers.py b/tests/test_quixstreams/test_models/test_serializers/test_quix_serializers.py index 46da6d6ba..81c4703f4 100644 --- a/tests/test_quixstreams/test_models/test_serializers/test_quix_serializers.py +++ b/tests/test_quixstreams/test_models/test_serializers/test_quix_serializers.py @@ -453,7 +453,7 @@ def test_serialize_missing_timestamp(self, as_legacy): {"a": None, "b": None}, # all values are None ], ) - def test_serialize_dict_empty_or_none(self, value): + def test_serialize_dict_empty_or_none_as_legacy_false(self, value): serializer = QuixTimeseriesSerializer(as_legacy=False) serialized = serializer( value, ctx=SerializationContext(topic="test", field=MessageField.VALUE) @@ -474,7 +474,7 @@ def test_serialize_dict_empty_or_none(self, value): {"a": None, "b": None}, # all values are None ], ) - def test_serialize_dict_empty_or_none(self, value): + def test_serialize_dict_empty_or_none_as_legacy_true(self, value): serializer = QuixTimeseriesSerializer(as_legacy=True) serialized = serializer( value, ctx=SerializationContext(topic="test", field=MessageField.VALUE) diff --git a/tests/test_quixstreams/test_models/test_topics/test_admin.py b/tests/test_quixstreams/test_models/test_topics/test_admin.py index 2e6ca3719..b3e82ba07 100644 --- a/tests/test_quixstreams/test_models/test_topics/test_admin.py +++ b/tests/test_quixstreams/test_models/test_topics/test_admin.py @@ -72,9 +72,10 @@ def test_create_topics_already_exist( topic_manager = topic_manager_factory() existing_topic = topic_manager.topic(name=topic_name) - with caplog.at_level(level=logging.INFO), patch.object( - topic_admin, "list_topics" - ) as list_topics_mock: + with ( + caplog.at_level(level=logging.INFO), + patch.object(topic_admin, "list_topics") as list_topics_mock, + ): # Mock "list_topics" call to simulate a topic being created # simultaneously by multiple instances list_topics_mock.return_value = {} diff --git a/tests/test_quixstreams/test_models/test_topics/test_topics.py b/tests/test_quixstreams/test_models/test_topics/test_topics.py index 839198da2..b2dead0ec 100644 --- a/tests/test_quixstreams/test_models/test_topics/test_topics.py +++ b/tests/test_quixstreams/test_models/test_topics/test_topics.py @@ -44,7 +44,6 @@ def __call__(self, value: bytes, ctx: SerializationContext): class TestTopic: - def test_repr(self, topic_manager_topic_factory): topic = topic_manager_topic_factory(name="foo") assert str(topic) == '' diff --git a/tests/test_quixstreams/test_platforms/test_quix/fixtures.py b/tests/test_quixstreams/test_platforms/test_quix/fixtures.py index 7185ed8fe..31e64e3af 100644 --- a/tests/test_quixstreams/test_platforms/test_quix/fixtures.py +++ b/tests/test_quixstreams/test_platforms/test_quix/fixtures.py @@ -1,14 +1,10 @@ from json import loads, JSONDecodeError from typing import Union, Optional -from unittest.mock import create_autospec import pytest import requests from typing_extensions import Type -from quixstreams.platforms.quix.api import QuixPortalApiService -from quixstreams.platforms.quix.config import QuixKafkaConfigsBuilder - class MockResponse: def __init__( diff --git a/tests/test_quixstreams/test_platforms/test_quix/test_api.py b/tests/test_quixstreams/test_platforms/test_quix/test_api.py index 23c409a81..11455b119 100644 --- a/tests/test_quixstreams/test_platforms/test_quix/test_api.py +++ b/tests/test_quixstreams/test_platforms/test_quix/test_api.py @@ -29,9 +29,9 @@ def test_get_workspace_certificate(self): ws = "12345" api = QuixPortalApiService(portal_api="http://portal.com", auth_token="token") api.session = create_autospec(QuixPortalApiService.SessionWithUrlBase) - api.session.get(f"/workspaces/{ws}/certificates").content = ( - zip_in_mem.getvalue() - ) + api.session.get( + f"/workspaces/{ws}/certificates" + ).content = zip_in_mem.getvalue() result = api.get_workspace_certificate(ws) assert result == b"my cool cert stuff" diff --git a/tests/test_quixstreams/test_platforms/test_quix/test_config.py b/tests/test_quixstreams/test_platforms/test_quix/test_config.py index c585c97f4..5764c29a9 100644 --- a/tests/test_quixstreams/test_platforms/test_quix/test_config.py +++ b/tests/test_quixstreams/test_platforms/test_quix/test_config.py @@ -1,8 +1,6 @@ import base64 from contextlib import ExitStack from copy import deepcopy -from os import getcwd -from pathlib import Path from unittest.mock import patch, call, create_autospec, PropertyMock import pytest diff --git a/tests/test_quixstreams/test_rowconsumer.py b/tests/test_quixstreams/test_rowconsumer.py index 834050778..d2849f72d 100644 --- a/tests/test_quixstreams/test_rowconsumer.py +++ b/tests/test_quixstreams/test_rowconsumer.py @@ -16,9 +16,12 @@ def test_poll_row_success( self, row_consumer_factory, topic_json_serdes_factory, producer ): topic = topic_json_serdes_factory() - with row_consumer_factory( - auto_offset_reset="earliest", - ) as consumer, producer: + with ( + row_consumer_factory( + auto_offset_reset="earliest", + ) as consumer, + producer, + ): producer.produce(topic=topic.name, key=b"key", value=b'{"field":"value"}') producer.flush() consumer.subscribe([topic]) @@ -35,9 +38,12 @@ def test_poll_row_multiple_topics( self, row_consumer_factory, topic_json_serdes_factory, producer ): topics = [topic_json_serdes_factory(), topic_json_serdes_factory()] - with row_consumer_factory( - auto_offset_reset="earliest", - ) as consumer, producer: + with ( + row_consumer_factory( + auto_offset_reset="earliest", + ) as consumer, + producer, + ): for topic in topics: producer.produce( topic=topic.name, key=b"key", value=b'{"field":"value"}' @@ -74,9 +80,12 @@ def __call__(self, *args, **kwargs): raise IgnoreMessage() topic = topic_manager_topic_factory(value_deserializer=_Deserializer()) - with row_consumer_factory( - auto_offset_reset="earliest", - ) as consumer, producer: + with ( + row_consumer_factory( + auto_offset_reset="earliest", + ) as consumer, + producer, + ): producer.produce(topic.name, key=b"key", value=b"value") producer.flush() consumer.subscribe([topic]) @@ -93,9 +102,12 @@ def test_poll_row_deserialization_error_raise( self, row_consumer_factory, topic_json_serdes_factory, producer ): topic = topic_json_serdes_factory() - with row_consumer_factory( - auto_offset_reset="earliest", - ) as consumer, producer: + with ( + row_consumer_factory( + auto_offset_reset="earliest", + ) as consumer, + producer, + ): producer.produce(topic.name, key=b"key", value=b"value") producer.flush() consumer.subscribe([topic]) @@ -106,9 +118,12 @@ def test_poll_row_kafka_error_raise( self, row_consumer_factory, topic_json_serdes_factory, producer ): topic = topic_json_serdes_factory() - with row_consumer_factory( - auto_offset_reset="error", - ) as consumer, producer: + with ( + row_consumer_factory( + auto_offset_reset="error", + ) as consumer, + producer, + ): producer.produce(topic.name, key=b"key", value=b"value") producer.flush() consumer.subscribe([topic]) @@ -128,10 +143,13 @@ def on_error(exc, *args): suppressed = True return True - with row_consumer_factory( - auto_offset_reset="earliest", - on_error=on_error, - ) as consumer, producer: + with ( + row_consumer_factory( + auto_offset_reset="earliest", + on_error=on_error, + ) as consumer, + producer, + ): producer.produce(topic.name, key=b"key", value=b"value") producer.flush() consumer.subscribe([topic]) @@ -152,10 +170,13 @@ def on_error(exc, *args): suppressed = True return True - with row_consumer_factory( - auto_offset_reset="error", - on_error=on_error, - ) as consumer, producer: + with ( + row_consumer_factory( + auto_offset_reset="error", + on_error=on_error, + ) as consumer, + producer, + ): producer.produce(topic.name, key=b"key", value=b"value") producer.flush() consumer.subscribe([topic]) @@ -174,10 +195,13 @@ def on_error(*_): def on_assign(*_): raise ValueError("Test") - with row_consumer_factory( - auto_offset_reset="error", - on_error=on_error, - ) as consumer, producer: + with ( + row_consumer_factory( + auto_offset_reset="error", + on_error=on_error, + ) as consumer, + producer, + ): producer.produce(topic.name, key=b"key", value=b"value") producer.flush() consumer.subscribe([topic], on_assign=on_assign) diff --git a/tests/test_quixstreams/test_rowproducer.py b/tests/test_quixstreams/test_rowproducer.py index 6c78ce6f0..ed28fe12a 100644 --- a/tests/test_quixstreams/test_rowproducer.py +++ b/tests/test_quixstreams/test_rowproducer.py @@ -76,9 +76,10 @@ def test_produce_row_custom_key( value = {"field": "value"} headers = [("header1", b"1")] - with row_consumer_factory( - auto_offset_reset="earliest" - ) as consumer, row_producer_factory() as producer: + with ( + row_consumer_factory(auto_offset_reset="earliest") as consumer, + row_producer_factory() as producer, + ): row = row_factory( topic=topic.name, value=value, diff --git a/tests/test_quixstreams/test_sinks/test_influxdb_v3.py b/tests/test_quixstreams/test_sinks/test_influxdb_v3.py index cf3de2760..f2bdf45ec 100644 --- a/tests/test_quixstreams/test_sinks/test_influxdb_v3.py +++ b/tests/test_quixstreams/test_sinks/test_influxdb_v3.py @@ -323,7 +323,7 @@ class Response: ) topic = "test-topic" - value1, value2 = {"key": "value1"}, {"key": "value2"} + value1 = {"key": "value1"} timestamp = 1 sink.add( value=value1, @@ -351,7 +351,7 @@ def test_write_fails_propagates_exception(self, influxdb3_sink_factory): ) topic = "test-topic" - value1, value2 = {"key": "value1"}, {"key": "value2"} + value1 = {"key": "value1"} timestamp = 1 sink.add( value=value1, diff --git a/tests/test_quixstreams/test_sources/test_csv.py b/tests/test_quixstreams/test_sources/test_csv.py index b19f675f8..84873bb4f 100644 --- a/tests/test_quixstreams/test_sources/test_csv.py +++ b/tests/test_quixstreams/test_sources/test_csv.py @@ -9,7 +9,6 @@ class TestCSVSource: - @pytest.fixture def producer(self): producer = MagicMock(spec=RowProducer) diff --git a/tests/test_quixstreams/test_sources/test_kafka.py b/tests/test_quixstreams/test_sources/test_kafka.py index 7b060377f..872908565 100644 --- a/tests/test_quixstreams/test_sources/test_kafka.py +++ b/tests/test_quixstreams/test_sources/test_kafka.py @@ -294,7 +294,6 @@ def _consume(config): class TestQuixEnvironmentSource(Base): - QUIX_WORKSPACE_ID = "my_workspace_id" def source(self, config, topic): diff --git a/tests/test_quixstreams/test_state/test_rocksdb/test_transaction.py b/tests/test_quixstreams/test_state/test_rocksdb/test_transaction.py index d28a230a4..9d5cf2d81 100644 --- a/tests/test_quixstreams/test_state/test_rocksdb/test_transaction.py +++ b/tests/test_quixstreams/test_state/test_rocksdb/test_transaction.py @@ -482,7 +482,7 @@ def test_set_and_prepare(self, rocksdb_partition_factory, changelog_producer_moc def test_delete_and_prepare( self, rocksdb_partition_factory, changelog_producer_mock ): - key, value = "key", "value" + key = "key" cf = "default" prefix = b"__key__" processed_offset = 1 @@ -490,7 +490,6 @@ def test_delete_and_prepare( with rocksdb_partition_factory( changelog_producer=changelog_producer_mock ) as partition: - tx = partition.begin() tx.delete(key=key, cf_name=cf, prefix=prefix) diff --git a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py index 20299a116..2a664c3cc 100644 --- a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py +++ b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py @@ -362,7 +362,6 @@ def test_delete_window_and_prepare( with windowed_rocksdb_partition_factory( changelog_producer=changelog_producer_mock ) as store_partition: - tx = store_partition.begin() tx.delete_window(start_ms=start_ms, end_ms=end_ms, prefix=prefix) tx.prepare(processed_offset=processed_offset)