Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Nov 12, 2024
2 parents 92349e2 + c76febc commit ebe457d
Show file tree
Hide file tree
Showing 17 changed files with 80 additions and 26 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ repos:
- id: text-unicode-replacement-char

- repo: https://github.com/asottile/pyupgrade
rev: v3.18.0
rev: v3.19.0
hooks:
- id: pyupgrade
args: [--py37-plus, --keep-runtime-typing]
Expand All @@ -107,7 +107,7 @@ repos:
language_version: python3

- repo: https://github.com/asottile/blacken-docs
rev: 1.19.0
rev: 1.19.1
hooks:
- id: blacken-docs
additional_dependencies:
Expand Down
23 changes: 23 additions & 0 deletions docs/changelog/0.12.2.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
0.12.2 (2024-11-12)
===================

Improvements
------------

- Change Spark ``jobDescription`` for DBReader & FileDFReader from ``DBReader.run() -> Connection`` to ``Connection -> DBReader.run()``.

Bug Fixes
---------

- Fix ``log_hwm`` result for ``KeyValueIntHWM`` (used by Kafka). (:github:pull:`316`)
- Fix ``log_collection`` hiding values of ``Kafka.addresses`` in logs with ``INFO`` level. (:github:pull:`316`)

Dependencies
------------

- Allow using `etl-entities==2.4.0 <https://github.com/MobileTeleSystems/etl-entities/releases/tag/2.4.0>`_.

Doc only Changes
----------------

- Fix links to MSSQL date & time type documentation.
1 change: 1 addition & 0 deletions docs/changelog/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
:caption: Changelog

DRAFT
0.12.2
0.12.1
0.12.0
0.11.2
Expand Down
1 change: 1 addition & 0 deletions docs/changelog/next_release/+.dependency.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow using `etl-entities==2.4.0 <https://github.com/MobileTeleSystems/etl-entities/releases/tag/2.4.0>`_.
1 change: 1 addition & 0 deletions docs/changelog/next_release/+.doc.1.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix links to MSSQL date & time type documentation.
1 change: 1 addition & 0 deletions docs/changelog/next_release/+.improvement.1.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change Spark ``jobDescription`` for DBReader & FileDFReader from ``DBReader.run() -> Connection`` to ``Connection -> DBReader.run()``.
1 change: 1 addition & 0 deletions docs/changelog/next_release/316.bugfix.1.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix ``log_hwm`` result for ``KeyValueIntHWM`` (used by Kafka).
1 change: 1 addition & 0 deletions docs/changelog/next_release/316.bugfix.2.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix ``log_collection`` hiding values of ``Kafka.addresses`` in logs with ``INFO`` level.
2 changes: 1 addition & 1 deletion docs/connection/db_connection/mssql/execute.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ This method supports **any** query syntax supported by MSSQL, like:
* ✅︎ ``ALTER ...``
* ✅︎ ``INSERT INTO ... AS SELECT ...``
* ✅︎ ``DROP TABLE ...``, ``DROP VIEW ...``, and so on
* ✅︎ ``CALL procedure(arg1, arg2) ...`` or ``{call procedure(arg1, arg2)}`` - special syntax for calling procedure
* ✅︎ ``EXEC procedure(arg1, arg2) ...`` or ``{call procedure(arg1, arg2)}`` - special syntax for calling procedure
* ✅︎ ``DECLARE ... BEGIN ... END`` - execute PL/SQL statement
* ✅︎ other statements not mentioned here
* ❌ ``SET ...; SELECT ...;`` - multiple statements not supported
Expand Down
5 changes: 2 additions & 3 deletions docs/connection/db_connection/mssql/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ Temporal types
So not all of values in Spark DataFrame can be written to MSSQL.

References:
* `Clickhouse DateTime documentation <https://clickhouse.com/docs/en/sql-reference/data-types/datetime>`_
* `Clickhouse DateTime documentation <https://clickhouse.com/docs/en/sql-reference/data-types/datetime>`_
* `MSSQL date & time types documentation <https://learn.microsoft.com/en-us/sql/t-sql/data-types/date-and-time-types>`_
* `Spark DateType documentation <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/DateType.html>`_
* `Spark TimestampType documentation <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/TimestampType.html>`_

Expand All @@ -213,7 +212,7 @@ Temporal types
Last digit will be lost during read or write operations.
.. [5]
``time`` type is the same as ``timestamp`` with date ``1970-01-01``. So instead of reading data from MSSQL like ``23:59:59.999999``
``time`` type is the same as ``datetime2`` with date ``1970-01-01``. So instead of reading data from MSSQL like ``23:59:59.999999``
it is actually read ``1970-01-01 23:59:59.999999``, and vice versa.
String types
Expand Down
2 changes: 1 addition & 1 deletion onetl/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.12.1
0.12.2
7 changes: 4 additions & 3 deletions onetl/connection/db_connection/kafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,14 @@ def get_min_max_values(
max_offsets[partition_id] = end_offset

log.info("|%s| Received min and max offset values for each partition.", self.__class__.__name__)
for partition_id in sorted(min_offsets.keys()):
partitions = sorted(set(min_offsets.keys() | max_offsets.keys()))
for partition_id in partitions:
log.debug(
"|%s| Partition %d: Min Offset = %d, Max Offset = %d",
self.__class__.__name__,
partition_id,
min_offsets[partition_id],
max_offsets[partition_id],
min_offsets.get(partition_id),
max_offsets.get(partition_id),
)

return min_offsets, max_offsets
Expand Down
2 changes: 1 addition & 1 deletion onetl/db/db_reader/db_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ def run(self) -> DataFrame:

self._check_strategy()

job_description = f"{self.__class__.__name__}.run({self.source}) -> {self.connection}"
job_description = f"{self.connection} -> {self.__class__.__name__}.run({self.source})"
with override_job_description(self.connection.spark, job_description):
if not self._connection_checked:
self._log_parameters()
Expand Down
4 changes: 2 additions & 2 deletions onetl/file/file_df_reader/file_df_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ def run(self, files: Iterable[str | os.PathLike] | None = None) -> DataFrame:
self._log_parameters(files)

if files:
job_description = f"{self.__class__.__name__}.run([..files..]) -> {self.connection}"
job_description = f"{self.connection} -> {self.__class__.__name__}.run([..files..])"
else:
job_description = f"{self.__class__.__name__}.run({self.source_path}) -> {self.connection}"
job_description = f"{self.connection} -> {self.__class__.__name__}.run({self.source_path})"

with override_job_description(self.connection.spark, job_description):
paths: FileSet[PurePathProtocol] = FileSet()
Expand Down
46 changes: 36 additions & 10 deletions onetl/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from contextlib import redirect_stdout
from enum import Enum
from textwrap import dedent
from typing import TYPE_CHECKING, Any, Collection, Iterable
from typing import TYPE_CHECKING, Any, Collection, Iterable, Mapping, Set

from etl_entities.hwm import HWM
from typing_extensions import deprecated
Expand Down Expand Up @@ -315,6 +315,8 @@ def log_collection(
log_collection(logger, "myvar", [])
log_collection(logger, "myvar", ["item1", {"item2": "value2"}, None])
log_collection(logger, "myvar", {"item1", "item2", None})
log_collection(logger, "myvar", {"key1": "value1", "key2": None})
log_collection(logger, "myvar", ["item1", "item2", "item3"], max_items=1)
log_collection(
logger,
Expand All @@ -334,6 +336,17 @@ def log_collection(
INFO onetl.module None,
INFO onetl.module ]
INFO onetl.module myvar = {
INFO onetl.module 'item1',
INFO onetl.module 'item2',
INFO onetl.module None,
INFO onetl.module }
INFO onetl.module myvar = {
INFO onetl.module 'key1': 'value1',
INFO onetl.module 'key2': None,
INFO onetl.module }
INFO onetl.module myvar = [
INFO onetl.module 'item1',
INFO onetl.module # ... 2 more items of type <class 'str'>
Expand All @@ -350,21 +363,30 @@ def log_collection(

base_indent = " " * (BASE_LOG_INDENT + indent)
stacklevel += 1
items = list(collection) # force convert all iterators to list to know size
if not items:
_log(logger, "%s%s = []", base_indent, name, level=level, stacklevel=stacklevel)

if not isinstance(collection, (Mapping, Set)):
collection = list(collection) # force convert all iterators to list to know size

start_bracket = "["
end_bracket = "]"
if isinstance(collection, (Mapping, Set)):
start_bracket = "{"
end_bracket = "}"

if not collection:
_log(logger, "%s%s = %s%s", base_indent, name, start_bracket, end_bracket, level=level, stacklevel=stacklevel)
return

nested_indent = " " * (BASE_LOG_INDENT + indent + 4)
_log(logger, "%s%s = [", base_indent, name, level=level, stacklevel=stacklevel)
_log(logger, "%s%s = %s", base_indent, name, start_bracket, level=level, stacklevel=stacklevel)

for i, item in enumerate(items, start=1):
if max_items and i > max_items and level >= logging.DEBUG:
for i, item in enumerate(sorted(collection), start=1):
if max_items and i > max_items and level > logging.DEBUG:
_log(
logger,
"%s# ... %d more items of type %r",
nested_indent,
len(items) - max_items,
len(collection) - max_items,
type(item),
level=level,
stacklevel=stacklevel,
Expand All @@ -377,9 +399,13 @@ def log_collection(
stacklevel=stacklevel,
)
break
_log(logger, "%s%r,", nested_indent, item, level=level, stacklevel=stacklevel)

_log(logger, "%s]", base_indent, level=level, stacklevel=stacklevel)
if isinstance(collection, Mapping):
_log(logger, "%s%r: %r,", nested_indent, item, collection[item], level=level, stacklevel=stacklevel)
else:
_log(logger, "%s%r,", nested_indent, item, level=level, stacklevel=stacklevel)

_log(logger, "%s%s", base_indent, end_bracket, level=level, stacklevel=stacklevel)


def entity_boundary_log(logger: logging.Logger, msg: str, char: str = "=", stacklevel: int = 1) -> None:
Expand Down
2 changes: 1 addition & 1 deletion requirements/core.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
etl-entities>=2.2,<2.4
etl-entities>=2.2,<2.5
evacuator>=1.0,<1.1
frozendict
humanize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def test_kafka_strategy_incremental(
):
from pyspark.sql.functions import max as spark_max

hwm_type = KeyValueIntHWM
hwm_name = secrets.token_hex(5)
store = HWMStoreStackManager.get_current()

Expand Down Expand Up @@ -77,7 +76,7 @@ def test_kafka_strategy_incremental(

hwm = store.get_hwm(hwm_name)
assert hwm is not None
assert isinstance(hwm, hwm_type)
assert isinstance(hwm, KeyValueIntHWM)

# HWM contains mapping `partition: max offset + 1`
partition_offsets_initial = dict.fromkeys(range(num_partitions or 1), 0)
Expand Down

0 comments on commit ebe457d

Please sign in to comment.