From d0e7b4b3191c4b384437aaf49492b5dfccf1ab61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 28 Oct 2024 08:21:30 +0000 Subject: [PATCH 1/8] [DOP-20393] Bump version --- onetl/VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onetl/VERSION b/onetl/VERSION index 34a83616..26acbf08 100644 --- a/onetl/VERSION +++ b/onetl/VERSION @@ -1 +1 @@ -0.12.1 +0.12.2 From e1ab213a6dac23d012a51f84630db5418cf6332e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 28 Oct 2024 22:08:14 +0000 Subject: [PATCH 2/8] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/asottile/pyupgrade: v3.18.0 → v3.19.0](https://github.com/asottile/pyupgrade/compare/v3.18.0...v3.19.0) - [github.com/asottile/blacken-docs: 1.19.0 → 1.19.1](https://github.com/asottile/blacken-docs/compare/1.19.0...1.19.1) --- .pre-commit-config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6d2bb684..ff260645 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -90,7 +90,7 @@ repos: - id: text-unicode-replacement-char - repo: https://github.com/asottile/pyupgrade - rev: v3.18.0 + rev: v3.19.0 hooks: - id: pyupgrade args: [--py37-plus, --keep-runtime-typing] @@ -107,7 +107,7 @@ repos: language_version: python3 - repo: https://github.com/asottile/blacken-docs - rev: 1.19.0 + rev: 1.19.1 hooks: - id: blacken-docs additional_dependencies: From 9c72d7aeeedf612552c7cf1b032ff92d0617c255 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 30 Oct 2024 08:25:44 +0000 Subject: [PATCH 3/8] Change jobDescription for DBReader & FileDFReader --- docs/changelog/next_release/+.improvement.1.rst | 1 + onetl/db/db_reader/db_reader.py | 2 +- onetl/file/file_df_reader/file_df_reader.py | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) create mode 100644 docs/changelog/next_release/+.improvement.1.rst diff --git a/docs/changelog/next_release/+.improvement.1.rst b/docs/changelog/next_release/+.improvement.1.rst new file mode 100644 index 00000000..e80ec742 --- /dev/null +++ b/docs/changelog/next_release/+.improvement.1.rst @@ -0,0 +1 @@ +Change Spark ``jobDescription`` for DBReader & FileDFReader from ``DBReader.run() -> Connection`` to ``Connection -> DBReader.run()``. diff --git a/onetl/db/db_reader/db_reader.py b/onetl/db/db_reader/db_reader.py index dd79876a..2729c662 100644 --- a/onetl/db/db_reader/db_reader.py +++ b/onetl/db/db_reader/db_reader.py @@ -635,7 +635,7 @@ def run(self) -> DataFrame: self._check_strategy() - job_description = f"{self.__class__.__name__}.run({self.source}) -> {self.connection}" + job_description = f"{self.connection} -> {self.__class__.__name__}.run({self.source})" with override_job_description(self.connection.spark, job_description): if not self._connection_checked: self._log_parameters() diff --git a/onetl/file/file_df_reader/file_df_reader.py b/onetl/file/file_df_reader/file_df_reader.py index 36aab796..8f319a38 100644 --- a/onetl/file/file_df_reader/file_df_reader.py +++ b/onetl/file/file_df_reader/file_df_reader.py @@ -212,9 +212,9 @@ def run(self, files: Iterable[str | os.PathLike] | None = None) -> DataFrame: self._log_parameters(files) if files: - job_description = f"{self.__class__.__name__}.run([..files..]) -> {self.connection}" + job_description = f"{self.connection} -> {self.__class__.__name__}.run([..files..])" else: - job_description = f"{self.__class__.__name__}.run({self.source_path}) -> {self.connection}" + job_description = f"{self.connection} -> {self.__class__.__name__}.run({self.source_path})" with override_job_description(self.connection.spark, job_description): paths: FileSet[PurePathProtocol] = FileSet() From bd34ef3c7e973749a7ceea50690eed8220d5cd43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 31 Oct 2024 08:00:17 +0000 Subject: [PATCH 4/8] Fix links to MSSQL documentation for date & time types --- docs/changelog/next_release/+.doc.1.rst | 1 + docs/connection/db_connection/mssql/types.rst | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) create mode 100644 docs/changelog/next_release/+.doc.1.rst diff --git a/docs/changelog/next_release/+.doc.1.rst b/docs/changelog/next_release/+.doc.1.rst new file mode 100644 index 00000000..5e1a4f3e --- /dev/null +++ b/docs/changelog/next_release/+.doc.1.rst @@ -0,0 +1 @@ +Fix links to MSSQL date & time type documentation. diff --git a/docs/connection/db_connection/mssql/types.rst b/docs/connection/db_connection/mssql/types.rst index 13c7874a..1770c91b 100644 --- a/docs/connection/db_connection/mssql/types.rst +++ b/docs/connection/db_connection/mssql/types.rst @@ -197,8 +197,7 @@ Temporal types So not all of values in Spark DataFrame can be written to MSSQL. References: - * `Clickhouse DateTime documentation `_ - * `Clickhouse DateTime documentation `_ + * `MSSQL date & time types documentation `_ * `Spark DateType documentation `_ * `Spark TimestampType documentation `_ @@ -213,7 +212,7 @@ Temporal types Last digit will be lost during read or write operations. .. [5] - ``time`` type is the same as ``timestamp`` with date ``1970-01-01``. So instead of reading data from MSSQL like ``23:59:59.999999`` + ``time`` type is the same as ``datetime2`` with date ``1970-01-01``. So instead of reading data from MSSQL like ``23:59:59.999999`` it is actually read ``1970-01-01 23:59:59.999999``, and vice versa. String types From 434648eea1360f88680f46910388a1c4b9e97c05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 11 Nov 2024 13:43:17 +0000 Subject: [PATCH 5/8] Fix MSSQL.execute supported syntax docs --- docs/connection/db_connection/mssql/execute.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connection/db_connection/mssql/execute.rst b/docs/connection/db_connection/mssql/execute.rst index 85280d92..91305cd7 100644 --- a/docs/connection/db_connection/mssql/execute.rst +++ b/docs/connection/db_connection/mssql/execute.rst @@ -72,7 +72,7 @@ This method supports **any** query syntax supported by MSSQL, like: * ✅︎ ``ALTER ...`` * ✅︎ ``INSERT INTO ... AS SELECT ...`` * ✅︎ ``DROP TABLE ...``, ``DROP VIEW ...``, and so on -* ✅︎ ``CALL procedure(arg1, arg2) ...`` or ``{call procedure(arg1, arg2)}`` - special syntax for calling procedure +* ✅︎ ``EXEC procedure(arg1, arg2) ...`` or ``{call procedure(arg1, arg2)}`` - special syntax for calling procedure * ✅︎ ``DECLARE ... BEGIN ... END`` - execute PL/SQL statement * ✅︎ other statements not mentioned here * ❌ ``SET ...; SELECT ...;`` - multiple statements not supported From 91b83b1cad273840535a860e0a5ca9b90f2ac8f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 12 Nov 2024 11:47:42 +0000 Subject: [PATCH 6/8] [DOP-21408] Fix logging KeyValueIntHWM using log_hwm --- docs/changelog/next_release/316.bugfix.1.rst | 1 + docs/changelog/next_release/316.bugfix.2.rst | 1 + onetl/log.py | 46 +++++++++++++++----- 3 files changed, 38 insertions(+), 10 deletions(-) create mode 100644 docs/changelog/next_release/316.bugfix.1.rst create mode 100644 docs/changelog/next_release/316.bugfix.2.rst diff --git a/docs/changelog/next_release/316.bugfix.1.rst b/docs/changelog/next_release/316.bugfix.1.rst new file mode 100644 index 00000000..88bad047 --- /dev/null +++ b/docs/changelog/next_release/316.bugfix.1.rst @@ -0,0 +1 @@ +Fix ``log_hwm`` result for ``KeyValueIntHWM`` (used by Kafka). diff --git a/docs/changelog/next_release/316.bugfix.2.rst b/docs/changelog/next_release/316.bugfix.2.rst new file mode 100644 index 00000000..485a7345 --- /dev/null +++ b/docs/changelog/next_release/316.bugfix.2.rst @@ -0,0 +1 @@ +Fix ``log_collection`` hiding values of ``Kafka.addresses`` in logs with ``INFO`` level. diff --git a/onetl/log.py b/onetl/log.py index d7fdd8e8..d9b1160f 100644 --- a/onetl/log.py +++ b/onetl/log.py @@ -9,7 +9,7 @@ from contextlib import redirect_stdout from enum import Enum from textwrap import dedent -from typing import TYPE_CHECKING, Any, Collection, Iterable +from typing import TYPE_CHECKING, Any, Collection, Iterable, Mapping, Set from etl_entities.hwm import HWM from typing_extensions import deprecated @@ -315,6 +315,8 @@ def log_collection( log_collection(logger, "myvar", []) log_collection(logger, "myvar", ["item1", {"item2": "value2"}, None]) + log_collection(logger, "myvar", {"item1", "item2", None}) + log_collection(logger, "myvar", {"key1": "value1", "key2": None}) log_collection(logger, "myvar", ["item1", "item2", "item3"], max_items=1) log_collection( logger, @@ -334,6 +336,17 @@ def log_collection( INFO onetl.module None, INFO onetl.module ] + INFO onetl.module myvar = { + INFO onetl.module 'item1', + INFO onetl.module 'item2', + INFO onetl.module None, + INFO onetl.module } + + INFO onetl.module myvar = { + INFO onetl.module 'key1': 'value1', + INFO onetl.module 'key2': None, + INFO onetl.module } + INFO onetl.module myvar = [ INFO onetl.module 'item1', INFO onetl.module # ... 2 more items of type @@ -350,21 +363,30 @@ def log_collection( base_indent = " " * (BASE_LOG_INDENT + indent) stacklevel += 1 - items = list(collection) # force convert all iterators to list to know size - if not items: - _log(logger, "%s%s = []", base_indent, name, level=level, stacklevel=stacklevel) + + if not isinstance(collection, (Mapping, Set)): + collection = list(collection) # force convert all iterators to list to know size + + start_bracket = "[" + end_bracket = "]" + if isinstance(collection, (Mapping, Set)): + start_bracket = "{" + end_bracket = "}" + + if not collection: + _log(logger, "%s%s = %s%s", base_indent, name, start_bracket, end_bracket, level=level, stacklevel=stacklevel) return nested_indent = " " * (BASE_LOG_INDENT + indent + 4) - _log(logger, "%s%s = [", base_indent, name, level=level, stacklevel=stacklevel) + _log(logger, "%s%s = %s", base_indent, name, start_bracket, level=level, stacklevel=stacklevel) - for i, item in enumerate(items, start=1): - if max_items and i > max_items and level >= logging.DEBUG: + for i, item in enumerate(sorted(collection), start=1): + if max_items and i > max_items and level > logging.DEBUG: _log( logger, "%s# ... %d more items of type %r", nested_indent, - len(items) - max_items, + len(collection) - max_items, type(item), level=level, stacklevel=stacklevel, @@ -377,9 +399,13 @@ def log_collection( stacklevel=stacklevel, ) break - _log(logger, "%s%r,", nested_indent, item, level=level, stacklevel=stacklevel) - _log(logger, "%s]", base_indent, level=level, stacklevel=stacklevel) + if isinstance(collection, Mapping): + _log(logger, "%s%r: %r,", nested_indent, item, collection[item], level=level, stacklevel=stacklevel) + else: + _log(logger, "%s%r,", nested_indent, item, level=level, stacklevel=stacklevel) + + _log(logger, "%s%s", base_indent, end_bracket, level=level, stacklevel=stacklevel) def entity_boundary_log(logger: logging.Logger, msg: str, char: str = "=", stacklevel: int = 1) -> None: From eb3177fa25b629f7e08a856b6056edd0b093aadd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 12 Nov 2024 14:14:19 +0000 Subject: [PATCH 7/8] [DOP-21408] Allow using etl-entities==2.4.0 --- docs/changelog/next_release/+.dependency.rst | 1 + onetl/connection/db_connection/kafka/connection.py | 7 ++++--- requirements/core.txt | 2 +- .../test_strategy_increment_kafka.py | 3 +-- 4 files changed, 7 insertions(+), 6 deletions(-) create mode 100644 docs/changelog/next_release/+.dependency.rst diff --git a/docs/changelog/next_release/+.dependency.rst b/docs/changelog/next_release/+.dependency.rst new file mode 100644 index 00000000..76811d78 --- /dev/null +++ b/docs/changelog/next_release/+.dependency.rst @@ -0,0 +1 @@ +Allow using ``etl-entities==2.4.0``. diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 71fa82bd..7765936c 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -527,13 +527,14 @@ def get_min_max_values( max_offsets[partition_id] = end_offset log.info("|%s| Received min and max offset values for each partition.", self.__class__.__name__) - for partition_id in sorted(min_offsets.keys()): + partitions = sorted(set(min_offsets.keys() | max_offsets.keys())) + for partition_id in partitions: log.debug( "|%s| Partition %d: Min Offset = %d, Max Offset = %d", self.__class__.__name__, partition_id, - min_offsets[partition_id], - max_offsets[partition_id], + min_offsets.get(partition_id), + max_offsets.get(partition_id), ) return min_offsets, max_offsets diff --git a/requirements/core.txt b/requirements/core.txt index c8e245a4..5d915ebe 100644 --- a/requirements/core.txt +++ b/requirements/core.txt @@ -1,4 +1,4 @@ -etl-entities>=2.2,<2.4 +etl-entities>=2.2,<2.5 evacuator>=1.0,<1.1 frozendict humanize diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py index 7ae146cc..d506394d 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py @@ -28,7 +28,6 @@ def test_kafka_strategy_incremental( ): from pyspark.sql.functions import max as spark_max - hwm_type = KeyValueIntHWM hwm_name = secrets.token_hex(5) store = HWMStoreStackManager.get_current() @@ -77,7 +76,7 @@ def test_kafka_strategy_incremental( hwm = store.get_hwm(hwm_name) assert hwm is not None - assert isinstance(hwm, hwm_type) + assert isinstance(hwm, KeyValueIntHWM) # HWM contains mapping `partition: max offset + 1` partition_offsets_initial = dict.fromkeys(range(num_partitions or 1), 0) From c76febca49de4e74f6bd43cbe213bb4ecd55d764 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 12 Nov 2024 14:44:00 +0000 Subject: [PATCH 8/8] [DOP-21408] Prepare for release --- docs/changelog/0.12.2.rst | 23 ++++++++++++++++++++ docs/changelog/index.rst | 1 + docs/changelog/next_release/+.dependency.rst | 2 +- 3 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 docs/changelog/0.12.2.rst diff --git a/docs/changelog/0.12.2.rst b/docs/changelog/0.12.2.rst new file mode 100644 index 00000000..d2b930f8 --- /dev/null +++ b/docs/changelog/0.12.2.rst @@ -0,0 +1,23 @@ +0.12.2 (2024-11-12) +=================== + +Improvements +------------ + +- Change Spark ``jobDescription`` for DBReader & FileDFReader from ``DBReader.run() -> Connection`` to ``Connection -> DBReader.run()``. + +Bug Fixes +--------- + +- Fix ``log_hwm`` result for ``KeyValueIntHWM`` (used by Kafka). (:github:pull:`316`) +- Fix ``log_collection`` hiding values of ``Kafka.addresses`` in logs with ``INFO`` level. (:github:pull:`316`) + +Dependencies +------------ + +- Allow using `etl-entities==2.4.0 `_. + +Doc only Changes +---------------- + +- Fix links to MSSQL date & time type documentation. diff --git a/docs/changelog/index.rst b/docs/changelog/index.rst index 812c5437..8e0370bb 100644 --- a/docs/changelog/index.rst +++ b/docs/changelog/index.rst @@ -3,6 +3,7 @@ :caption: Changelog DRAFT + 0.12.2 0.12.1 0.12.0 0.11.2 diff --git a/docs/changelog/next_release/+.dependency.rst b/docs/changelog/next_release/+.dependency.rst index 76811d78..33ffb4ee 100644 --- a/docs/changelog/next_release/+.dependency.rst +++ b/docs/changelog/next_release/+.dependency.rst @@ -1 +1 @@ -Allow using ``etl-entities==2.4.0``. +Allow using `etl-entities==2.4.0 `_.