From 48fbf64ea90c1c93713471ca00e72dc6e55fe363 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Thu, 4 Apr 2024 17:51:26 +0200 Subject: [PATCH] Release v3.3.0 (#3043) * Make sure tests are not making external calls and pass url with scheme to urllib3 to avoid warnings * Make sure unit tests not rely on filesystem state * Bump pyright and "solve" reported "issues" Most of them are related to partially unknown types of values from empty dict or list. To solve it for the empty dict we use `EMPTY_DICT` object of newly introduced `_FrozenDict` class. * Improve unit-tests code coverage * Add release notes for 3.3.0 * Bump version * Fix pyinstaller spec file * python 3.6 compatibility --------- Co-authored-by: Polina Bungina <27892524+hughcapet@users.noreply.github.com> --- .github/workflows/tests.yaml | 2 +- docs/releases.rst | 70 +++++++++++++++++++ docs/tools_integration.rst | 2 + patroni.spec | 12 ++-- patroni/collections.py | 53 ++++++++++++-- patroni/config.py | 6 +- patroni/config_generator.py | 10 ++- patroni/dcs/__init__.py | 4 ++ patroni/dcs/consul.py | 3 +- patroni/dcs/kubernetes.py | 25 ++++--- patroni/global_config.py | 5 +- patroni/log.py | 3 +- patroni/postgresql/__init__.py | 6 +- .../available_parameters/__init__.py | 5 +- patroni/postgresql/bootstrap.py | 5 +- patroni/postgresql/cancellable.py | 3 +- patroni/postgresql/config.py | 9 +-- patroni/postgresql/mpp/__init__.py | 2 + patroni/postgresql/rewind.py | 3 +- patroni/psycopg.py | 5 +- patroni/validator.py | 5 +- patroni/version.py | 2 +- tests/test_config.py | 8 +-- tests/test_consul.py | 2 +- tests/test_etcd3.py | 2 +- tests/test_kubernetes.py | 6 +- 26 files changed, 201 insertions(+), 57 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 098bd0104..311aed667 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -174,7 +174,7 @@ jobs: - uses: jakebailey/pyright-action@v1 with: - version: 1.1.347 + version: 1.1.356 docs: runs-on: ubuntu-latest diff --git a/docs/releases.rst b/docs/releases.rst index ac167bd0a..f30aacbc4 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -3,6 +3,76 @@ Release notes ============= +Version 3.3.0 +------------- + +.. warning:: + All older Partoni versions are not compatible with ``ydiff>=1.3``. + + There are the following options available to "fix" the problem: + + 1. upgrade Patroni to the latest version + 2. install ``ydiff<1.3`` after installing Patroni + 3. install ``cdiff`` module + + +**New features** + +- Add ability to pass ``auth_data`` to Zookeeper client (Aras Mumcuyan) + + It allows to specify the authentication credentials to use for the connection. + +- Add a contrib script for ``Barman`` integration (Israel Barth Rubio) + + Provide an application ``patroni_barman`` that allows to perform ``Barman`` operations remotely and can be used as a custom bootstrap/custom replica method or as an ``on_role_change`` callback. Please check :ref:`here ` for more information. + +- Support ``JSON`` log format (alisalemmi) + + Apart from ``plain`` (default), Patroni now also supports ``json`` log format. Requires ``python-json-logger>=2.0.2`` library to be installed. + +- Show ``pending_restart_reason`` information (Polina Bungina) + + Provide extended information about the PostgreSQL parameters that caused ``pending_restart`` flag to be set. Both ``patronictl list`` and ``/patroni`` REST API endpoint now show the parameters names and their "diff" as ``pending_restart_reason``. + +- Implement ``nostream`` tag (Grigory Smolkin) + + If ``nostream`` tag is set to ``true``, the node will not use replication protocol to stream WAL but instead rely on archive recovery (if ``restore_command`` is configured). It also disables copying and synchronization of permanent logical replication slots on the node itself and all its cascading replicas. + + +**Improvements** + +- Implement validation of the ``log`` section (Alexander Kukushkin) + + Until now validator was not checking the correctness of the logging configuration provided. + +- Improve logging for PostgreSQL parameters change (Polina Bungina) + + Convert old values to a human-readable format and log information about the ``pg_controldata`` vs Patroni global configuration mismatch. + + +**Bugfixes** + +- Properly filter out not allowed ``pg_basebackup`` options (Israel Barth Rubio) + + Due to a bug, Patroni was not properly filtering out the not allowed options configured for the ``basebackup`` replica bootstrap method, when provided in the ``- setting: value`` format. + +- Fix ``etcd3`` authentication error handling (Alexander Kukushkin) + + Always retry one time on ``etcd3`` authentication error if authentication was not done right before executing the request. Also, do not restart watchers on reauthentication. + +- Improve logic of the validator files discovery (Waynerv) + + Use ``importlib`` library to discover the files with available configuration parameters when possible (for Python 3.9+). This implementation is more stable and doesn't break the Patroni distributions based on ``zip`` archives. + +- Use ``target_session_attrs`` only when multiple hosts are specified in the ``standby_cluster`` section (Alexander Kukushkin) + + ``target_session_attrs=read-write`` is now added to the ``primary_conninfo`` on the standby leader node only when ``standby_cluster.host`` section contains multiple hosts separated by commas. + +- Add compatibility code for ``ydiff`` library version 1.3+ (Alexander Kukushkin) + + Patroni is relying on some API from ``ydiff`` that is not public because it is supposed to be just a terminal tool rather than a python module. Unfortunately, the API change in 1.3 broke old Patroni versions. + + Version 3.2.2 ------------- diff --git a/docs/tools_integration.rst b/docs/tools_integration.rst index de7d3c6d4..1419e58d3 100644 --- a/docs/tools_integration.rst +++ b/docs/tools_integration.rst @@ -1,3 +1,5 @@ +.. _tools_integration: + Integration with other tools ============================ diff --git a/patroni.spec b/patroni.spec index b5c414a2e..ad9d2d7b4 100644 --- a/patroni.spec +++ b/patroni.spec @@ -13,13 +13,17 @@ def hiddenimports(): sys.path.pop(0) +def resources(): + import os + res_dir = 'patroni/postgresql/available_parameters/' + exts = set(f.split('.')[-1] for f in os.listdir(res_dir)) + return [(res_dir + '*.' + e, res_dir) for e in exts if e.lower() in {'yml', 'yaml'}] + + a = Analysis(['patroni/__main__.py'], pathex=[], binaries=None, - datas=[ - ('patroni/postgresql/available_parameters/*.yml', 'patroni/postgresql/available_parameters'), - ('patroni/postgresql/available_parameters/*.yaml', 'patroni/postgresql/available_parameters'), - ], + datas=resources(), hiddenimports=hiddenimports(), hookspath=[], runtime_hooks=[], diff --git a/patroni/collections.py b/patroni/collections.py index 2d38e8d7f..94dff7bb9 100644 --- a/patroni/collections.py +++ b/patroni/collections.py @@ -1,9 +1,10 @@ """Patroni custom object types somewhat like :mod:`collections` module. -Provides a case insensitive :class:`dict` and :class:`set` object types. +Provides a case insensitive :class:`dict` and :class:`set` object types, and `EMPTY_DICT` frozen dictionary object. """ from collections import OrderedDict -from typing import Any, Collection, Dict, Iterator, KeysView, MutableMapping, MutableSet, Optional +from copy import deepcopy +from typing import Any, Collection, Dict, Iterator, KeysView, Mapping, MutableMapping, MutableSet, Optional class CaseInsensitiveSet(MutableSet[str]): @@ -48,7 +49,7 @@ def __str__(self) -> str: """ return str(set(self._values.values())) - def __contains__(self, value: str) -> bool: + def __contains__(self, value: object) -> bool: """Check if set contains *value*. The check is performed case-insensitively. @@ -57,7 +58,7 @@ def __contains__(self, value: str) -> bool: :returns: ``True`` if *value* is already in the set, ``False`` otherwise. """ - return value.lower() in self._values + return isinstance(value, str) and value.lower() in self._values def __iter__(self) -> Iterator[str]: """Iterate over the values in this set. @@ -207,3 +208,47 @@ def __repr__(self) -> str: "'.format(type(self).__name__, dict(self.items()), id(self)) + + +class _FrozenDict(Mapping[str, Any]): + """Frozen dictionary object.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Create a new instance of :class:`_FrozenDict` with given data.""" + self.__values: Dict[str, Any] = dict(*args, **kwargs) + + def __iter__(self) -> Iterator[str]: + """Iterate over keys of this dict. + + :yields: each key present in the dict. Yields each key with its last case that has been stored. + """ + return iter(self.__values) + + def __len__(self) -> int: + """Get the length of this dict. + + :returns: number of keys in the dict. + + :Example: + + >>> len(_FrozenDict()) + 0 + """ + return len(self.__values) + + def __getitem__(self, key: str) -> Any: + """Get the value corresponding to *key*. + + :returns: value corresponding to *key*. + """ + return self.__values[key] + + def copy(self) -> Dict[str, Any]: + """Create a copy of this dict. + + :return: a new dict object with the same keys and values of this dict. + """ + return deepcopy(self.__values) + + +EMPTY_DICT = _FrozenDict() diff --git a/patroni/config.py b/patroni/config.py index baca2a001..662047f87 100644 --- a/patroni/config.py +++ b/patroni/config.py @@ -12,7 +12,7 @@ from typing import Any, Callable, Collection, Dict, List, Optional, Union, TYPE_CHECKING from . import PATRONI_ENV_PREFIX -from .collections import CaseInsensitiveDict +from .collections import CaseInsensitiveDict, EMPTY_DICT from .dcs import ClusterConfig from .exceptions import ConfigParseError from .file_perm import pg_perm @@ -445,14 +445,14 @@ def _safe_copy_dynamic_configuration(self, dynamic_configuration: Dict[str, Any] for name, value in dynamic_configuration.items(): if name == 'postgresql': - for name, value in (value or {}).items(): + for name, value in (value or EMPTY_DICT).items(): if name == 'parameters': config['postgresql'][name].update(self._process_postgresql_parameters(value)) elif name not in ('connect_address', 'proxy_address', 'listen', 'config_dir', 'data_dir', 'pgpass', 'authentication'): config['postgresql'][name] = deepcopy(value) elif name == 'standby_cluster': - for name, value in (value or {}).items(): + for name, value in (value or EMPTY_DICT).items(): if name in self.__DEFAULT_CONFIG['standby_cluster']: config['standby_cluster'][name] = deepcopy(value) elif name in config: # only variables present in __DEFAULT_CONFIG allowed to be overridden from DCS diff --git a/patroni/config_generator.py b/patroni/config_generator.py index cf0b02a52..1ff4dce0b 100644 --- a/patroni/config_generator.py +++ b/patroni/config_generator.py @@ -15,6 +15,7 @@ from psycopg2 import cursor from . import psycopg +from .collections import EMPTY_DICT from .config import Config from .exceptions import PatroniException from .log import PatroniLogger @@ -244,7 +245,8 @@ def _get_int_major_version(self) -> int: See :func:`~patroni.postgresql.misc.postgres_major_version_to_int` and :func:`~patroni.utils.get_major_version`. """ - postgres_bin = ((self.config.get('postgresql') or {}).get('bin_name') or {}).get('postgres', 'postgres') + postgres_bin = ((self.config.get('postgresql') + or EMPTY_DICT).get('bin_name') or EMPTY_DICT).get('postgres', 'postgres') return postgres_major_version_to_int(get_major_version(self.config['postgresql'].get('bin_dir'), postgres_bin)) def generate(self) -> None: @@ -411,8 +413,10 @@ def _set_su_params(self) -> None: val = self.parsed_dsn.get(conn_param, os.getenv(env_var)) if val: su_params[conn_param] = val - patroni_env_su_username = ((self.config.get('authentication') or {}).get('superuser') or {}).get('username') - patroni_env_su_pwd = ((self.config.get('authentication') or {}).get('superuser') or {}).get('password') + patroni_env_su_username = ((self.config.get('authentication') + or EMPTY_DICT).get('superuser') or EMPTY_DICT).get('username') + patroni_env_su_pwd = ((self.config.get('authentication') + or EMPTY_DICT).get('superuser') or EMPTY_DICT).get('password') # because we use "username" in the config for some reason su_params['username'] = su_params.pop('user', patroni_env_su_username) or getuser() su_params['password'] = su_params.get('password', patroni_env_su_pwd) or \ diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index 21eba1bd6..971cc3c91 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -85,6 +85,8 @@ def dcs_modules() -> List[str]: :returns: list of known module names with absolute python module path namespace, e.g. ``patroni.dcs.etcd``. """ + if TYPE_CHECKING: # pragma: no cover + assert isinstance(__package__, str) return iter_modules(__package__) @@ -101,6 +103,8 @@ def iter_dcs_classes( :returns: an iterator of tuples, each containing the module ``name`` and the imported DCS class object. """ + if TYPE_CHECKING: # pragma: no cover + assert isinstance(__package__, str) return iter_classes(__package__, AbstractDCS, config) diff --git a/patroni/dcs/consul.py b/patroni/dcs/consul.py index c6747d060..deedfdb14 100644 --- a/patroni/dcs/consul.py +++ b/patroni/dcs/consul.py @@ -444,8 +444,9 @@ def _mpp_cluster_loader(self, path: str) -> Dict[int, Cluster]: :returns: all MPP groups as :class:`dict`, with group IDs as keys and :class:`Cluster` objects as values. """ + results: Optional[List[Dict[str, Any]]] _, results = self.retry(self._client.kv.get, path, recurse=True, consistency=self._consistency) - clusters: Dict[int, Dict[str, Cluster]] = defaultdict(dict) + clusters: Dict[int, Dict[str, Dict[str, Any]]] = defaultdict(dict) for node in results or []: key = node['Key'][len(path):].split('/', 1) if len(key) == 2 and self._mpp.group_re.match(key[0]): diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 4b32130d4..9590bb98f 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -20,6 +20,7 @@ from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, Union, TYPE_CHECKING from . import AbstractDCS, Cluster, ClusterConfig, Failover, Leader, Member, Status, SyncState, TimelineHistory +from ..collections import EMPTY_DICT from ..exceptions import DCSError from ..postgresql.mpp import AbstractMPP from ..utils import deep_compare, iter_response_objects, keepalive_socket_options, \ @@ -470,7 +471,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Union[urllib3.HTTPResponse, K8sObject] if len(args) == 3: # name, namespace, body body = args[2] elif action == 'create': # namespace, body - body = args[1] + body = args[1] # pyright: ignore [reportGeneralTypeIssues] elif action == 'delete': # name, namespace body = kwargs.pop('body', None) else: @@ -509,7 +510,7 @@ def __init__(self, orig: K8sClient.rest.ApiException) -> None: @property def sleeptime(self) -> Optional[int]: try: - return int((self.headers or {}).get('retry-after', '')) + return int((self.headers or EMPTY_DICT).get('retry-after', '')) except Exception: return None @@ -654,7 +655,7 @@ def _process_event(self, event: Dict[str, Union[Any, Dict[str, Union[Any, Dict[s obj = K8sObject(obj) success, old_value = self.set(name, obj) if success: - new_value = (obj.metadata.annotations or {}).get(self._annotations_map.get(name)) + new_value = (obj.metadata.annotations or EMPTY_DICT).get(self._annotations_map.get(name, '')) elif ev_type == 'DELETED': success, old_value = self.delete(name, obj['metadata']['resourceVersion']) else: @@ -662,7 +663,7 @@ def _process_event(self, event: Dict[str, Union[Any, Dict[str, Union[Any, Dict[s if success and obj.get('kind') != 'Pod': if old_value: - old_value = (old_value.metadata.annotations or {}).get(self._annotations_map.get(name)) + old_value = (old_value.metadata.annotations or EMPTY_DICT).get(self._annotations_map.get(name, '')) value_changed = old_value != new_value and \ (name != self._dcs.config_path or old_value is not None and new_value is not None) @@ -844,7 +845,7 @@ def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None: @staticmethod def member(pod: K8sObject) -> Member: - annotations = pod.metadata.annotations or {} + annotations = pod.metadata.annotations or EMPTY_DICT member = Member.from_node(pod.metadata.resource_version, pod.metadata.name, None, annotations.get('status', '')) member.data['pod_labels'] = pod.metadata.labels return member @@ -925,7 +926,7 @@ def _cluster_from_nodes(self, group: str, nodes: Dict[str, K8sObject], pods: Col failover = nodes.get(path + self._FAILOVER) metadata = failover and failover.metadata failover = metadata and Failover.from_node(metadata.resource_version, - (metadata.annotations or {}).copy()) + (metadata.annotations or EMPTY_DICT).copy()) # get synchronization state sync = nodes.get(path + self._SYNC) @@ -1047,8 +1048,9 @@ def subsets_changed(last_observed_subsets: List[K8sObject], ip: str, ports: List def __target_ref(self, leader_ip: str, latest_subsets: List[K8sObject], pod: K8sObject) -> K8sObject: # we want to re-use existing target_ref if possible + empty_addresses: List[K8sObject] = [] for subset in latest_subsets: - for address in subset.addresses or []: + for address in subset.addresses or empty_addresses: if address.ip == leader_ip and address.target_ref and address.target_ref.name == self._name: return address.target_ref return k8s_client.V1ObjectReference(kind='Pod', uid=pod.metadata.uid, namespace=self._namespace, @@ -1056,7 +1058,8 @@ def __target_ref(self, leader_ip: str, latest_subsets: List[K8sObject], pod: K8s def _map_subsets(self, endpoints: Dict[str, Any], ips: List[str]) -> None: leader = self._kinds.get(self.leader_path) - latest_subsets = leader and leader.subsets or [] + empty_addresses: List[K8sObject] = [] + latest_subsets = leader and leader.subsets or empty_addresses if not ips: # We want to have subsets empty if latest_subsets: @@ -1212,7 +1215,7 @@ def _retry(*args: Any, **kwargs: Any) -> Any: if not retry.ensure_deadline(0.5): return False - kind_annotations = kind and kind.metadata.annotations or {} + kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT kind_resource_version = kind and kind.metadata.resource_version # There is different leader or resource_version in cache didn't change @@ -1225,7 +1228,7 @@ def _retry(*args: Any, **kwargs: Any) -> Any: def update_leader(self, leader: Leader, last_lsn: Optional[int], slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) -> bool: kind = self._kinds.get(self.leader_path) - kind_annotations = kind and kind.metadata.annotations or {} + kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT if kind and kind_annotations.get(self._LEADER) != self._name: return False @@ -1346,7 +1349,7 @@ def _delete_leader(self, leader: Leader) -> bool: def delete_leader(self, leader: Optional[Leader], last_lsn: Optional[int] = None) -> bool: ret = False kind = self._kinds.get(self.leader_path) - if kind and (kind.metadata.annotations or {}).get(self._LEADER) == self._name: + if kind and (kind.metadata.annotations or EMPTY_DICT).get(self._LEADER) == self._name: annotations: Dict[str, Optional[str]] = {self._LEADER: None} if last_lsn: annotations[self._OPTIME] = str(last_lsn) diff --git a/patroni/global_config.py b/patroni/global_config.py index ded66f6fe..5423d7e58 100644 --- a/patroni/global_config.py +++ b/patroni/global_config.py @@ -10,6 +10,7 @@ from copy import deepcopy from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING +from .collections import EMPTY_DICT from .utils import parse_bool, parse_int if TYPE_CHECKING: # pragma: no cover @@ -214,7 +215,7 @@ def max_timelines_history(self) -> int: @property def use_slots(self) -> bool: """``True`` if cluster is configured to use replication slots.""" - return bool(parse_bool((self.get('postgresql') or {}).get('use_slots', True))) + return bool(parse_bool((self.get('postgresql') or EMPTY_DICT).get('use_slots', True))) @property def permanent_slots(self) -> Dict[str, Any]: @@ -222,7 +223,7 @@ def permanent_slots(self) -> Dict[str, Any]: return deepcopy(self.get('permanent_replication_slots') or self.get('permanent_slots') or self.get('slots') - or {}) + or EMPTY_DICT.copy()) sys.modules[__name__] = GlobalConfig() diff --git a/patroni/log.py b/patroni/log.py index beaacf4e8..177520566 100644 --- a/patroni/log.py +++ b/patroni/log.py @@ -413,7 +413,8 @@ def reload_config(self, config: Dict[str, Any]) -> None: if not isinstance(handler, RotatingFileHandler): handler = RotatingFileHandler(os.path.join(config['dir'], __name__)) - handler.maxBytes = int(config.get('file_size', 25000000)) # pyright: ignore [reportGeneralTypeIssues] + max_file_size = int(config.get('file_size', 25000000)) + handler.maxBytes = max_file_size # pyright: ignore [reportAttributeAccessIssue] handler.backupCount = int(config.get('file_num', 4)) # we can't use `if not isinstance(handler, logging.StreamHandler)` below, # because RotatingFileHandler is a child of StreamHandler!!! diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index 059c8034f..f6deb97de 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -26,7 +26,7 @@ from .sync import SyncHandler from .. import global_config, psycopg from ..async_executor import CriticalTask -from ..collections import CaseInsensitiveSet, CaseInsensitiveDict +from ..collections import CaseInsensitiveSet, CaseInsensitiveDict, EMPTY_DICT from ..dcs import Cluster, Leader, Member, SLOT_ADVANCE_AVAILABLE_VERSION from ..exceptions import PostgresConnectionException from ..utils import Retry, RetryFailedError, polling_loop, data_directory_is_empty, parse_int @@ -272,7 +272,7 @@ def pgcommand(self, cmd: str) -> str: :returns: path to Postgres binary named *cmd*. """ - return os.path.join(self._bin_dir, (self.config.get('bin_name', {}) or {}).get(cmd, cmd)) + return os.path.join(self._bin_dir, (self.config.get('bin_name', {}) or EMPTY_DICT).get(cmd, cmd)) def pg_ctl(self, cmd: str, *args: str, **kwargs: Any) -> bool: """Builds and executes pg_ctl command @@ -414,7 +414,7 @@ def data_directory_empty(self) -> bool: return data_directory_is_empty(self._data_dir) def replica_method_options(self, method: str) -> Dict[str, Any]: - return deepcopy(self.config.get(method, {}) or {}) + return deepcopy(self.config.get(method, {}) or EMPTY_DICT.copy()) def replica_method_can_work_without_replication_connection(self, method: str) -> bool: return method != 'basebackup' and bool(self.replica_method_options(method).get('no_master') diff --git a/patroni/postgresql/available_parameters/__init__.py b/patroni/postgresql/available_parameters/__init__.py index 8af63cd97..8ab4a20fb 100644 --- a/patroni/postgresql/available_parameters/__init__.py +++ b/patroni/postgresql/available_parameters/__init__.py @@ -1,12 +1,13 @@ import logging import sys -from pathlib import Path from typing import Iterator logger = logging.getLogger(__name__) -if sys.version_info < (3, 9): +if sys.version_info < (3, 9): # pragma: no cover + from pathlib import Path + PathLikeObj = Path conf_dir = Path(__file__).parent else: diff --git a/patroni/postgresql/bootstrap.py b/patroni/postgresql/bootstrap.py index d112dbe61..779410c14 100644 --- a/patroni/postgresql/bootstrap.py +++ b/patroni/postgresql/bootstrap.py @@ -7,6 +7,7 @@ from typing import Any, Callable, Dict, List, Optional, Union, Tuple, TYPE_CHECKING from ..async_executor import CriticalTask +from ..collections import EMPTY_DICT from ..dcs import Leader, Member, RemoteMember from ..psycopg import quote_ident, quote_literal from ..utils import deep_compare, unquote @@ -146,7 +147,7 @@ def _post_restore(self) -> None: # make sure there is no trigger file or postgres will be automatically promoted trigger_file = self._postgresql.config.triggerfile_good_name - trigger_file = (self._postgresql.config.get('recovery_conf') or {}).get(trigger_file) or 'promote' + trigger_file = (self._postgresql.config.get('recovery_conf') or EMPTY_DICT).get(trigger_file) or 'promote' trigger_file = os.path.abspath(os.path.join(self._postgresql.data_dir, trigger_file)) if os.path.exists(trigger_file): os.unlink(trigger_file) @@ -441,7 +442,7 @@ def post_bootstrap(self, config: Dict[str, Any], task: CriticalTask) -> Optional if config.get('users'): logger.warning('User creation via "bootstrap.users" will be removed in v4.0.0') - for name, value in (config.get('users') or {}).items(): + for name, value in (config.get('users') or EMPTY_DICT).items(): if all(name != a.get('username') for a in (superuser, replication, rewind)): self.create_or_update_role(name, value.get('password'), value.get('options', [])) diff --git a/patroni/postgresql/cancellable.py b/patroni/postgresql/cancellable.py index 83423808c..e44729b07 100644 --- a/patroni/postgresql/cancellable.py +++ b/patroni/postgresql/cancellable.py @@ -100,7 +100,8 @@ def call(self, *args: Any, **kwargs: Union[Any, Dict[str, str]]) -> Optional[int if started and self._process is not None: if isinstance(communicate, dict): - communicate['stdout'], communicate['stderr'] = self._process.communicate(input_data) + communicate['stdout'], communicate['stderr'] = \ + self._process.communicate(input_data) # pyright: ignore [reportGeneralTypeIssues] return self._process.wait() finally: with self._lock: diff --git a/patroni/postgresql/config.py b/patroni/postgresql/config.py index ca8c8e75e..8b7d03286 100644 --- a/patroni/postgresql/config.py +++ b/patroni/postgresql/config.py @@ -13,7 +13,7 @@ from .validator import recovery_parameters, transform_postgresql_parameter_value, transform_recovery_parameter_value from .. import global_config -from ..collections import CaseInsensitiveDict, CaseInsensitiveSet +from ..collections import CaseInsensitiveDict, CaseInsensitiveSet, EMPTY_DICT from ..dcs import Leader, Member, RemoteMember, slot_name_from_member_name from ..exceptions import PatroniFatalException, PostgresConnectionException from ..file_perm import pg_perm @@ -619,7 +619,8 @@ def _write_recovery_params(self, fd: ConfigWriter, recovery_params: CaseInsensit fd.write_param(name, value) def build_recovery_params(self, member: Union[Leader, Member, None]) -> CaseInsensitiveDict: - recovery_params = CaseInsensitiveDict({p: v for p, v in (self.get('recovery_conf') or {}).items() + default: Dict[str, Any] = {} + recovery_params = CaseInsensitiveDict({p: v for p, v in (self.get('recovery_conf') or default).items() if not p.lower().startswith('recovery_target') and p.lower() not in ('primary_conninfo', 'primary_slot_name')}) recovery_params.update({'standby_mode': 'on', 'recovery_target_timeline': 'latest'}) @@ -845,7 +846,7 @@ def record_missmatch(mtype: bool) -> None: required['restart' if mtype else 'reload'] += 1 wanted_recovery_params = self.build_recovery_params(member) - for param, value in (self._current_recovery_params or {}).items(): + for param, value in (self._current_recovery_params or EMPTY_DICT).items(): # Skip certain parameters defined in the included postgres config files # if we know that they are not specified in the patroni configuration. if len(value) > 2 and value[2] not in (self._postgresql_conf, self._auto_conf) and \ @@ -1324,4 +1325,4 @@ def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]: return self._config.get(key, default) def restore_command(self) -> Optional[str]: - return (self.get('recovery_conf') or {}).get('restore_command') + return (self.get('recovery_conf') or EMPTY_DICT).get('restore_command') diff --git a/patroni/postgresql/mpp/__init__.py b/patroni/postgresql/mpp/__init__.py index bb180d8a5..5120db35a 100644 --- a/patroni/postgresql/mpp/__init__.py +++ b/patroni/postgresql/mpp/__init__.py @@ -299,6 +299,8 @@ def iter_mpp_classes( :yields: tuples, each containing the module ``name`` and the imported MPP class object. """ + if TYPE_CHECKING: # pragma: no cover + assert isinstance(__package__, str) yield from iter_classes(__package__, AbstractMPP, config) diff --git a/patroni/postgresql/rewind.py b/patroni/postgresql/rewind.py index 0639866cc..c3461630d 100644 --- a/patroni/postgresql/rewind.py +++ b/patroni/postgresql/rewind.py @@ -13,6 +13,7 @@ from .connection import get_connection_cursor from .misc import format_lsn, fsync_dir, parse_history, parse_lsn from ..async_executor import CriticalTask +from ..collections import EMPTY_DICT from ..dcs import Leader, RemoteMember logger = logging.getLogger(__name__) @@ -418,7 +419,7 @@ def pg_rewind(self, r: Dict[str, Any]) -> bool: dsn = self._postgresql.config.format_dsn(r, True) logger.info('running pg_rewind from %s', dsn) - restore_command = (self._postgresql.config.get('recovery_conf') or {}).get('restore_command') \ + restore_command = (self._postgresql.config.get('recovery_conf') or EMPTY_DICT).get('restore_command') \ if self._postgresql.major_version < 120000 else self._postgresql.get_guc_value('restore_command') # Until v15 pg_rewind expected postgresql.conf to be inside $PGDATA, which is not the case on e.g. Debian diff --git a/patroni/psycopg.py b/patroni/psycopg.py index 4a92047ca..556244143 100644 --- a/patroni/psycopg.py +++ b/patroni/psycopg.py @@ -42,7 +42,8 @@ def quote_literal(value: Any, conn: Optional[Any] = None) -> str: value.prepare(conn) return value.getquoted().decode('utf-8') except ImportError: - from psycopg import connect as __connect, sql, Error, DatabaseError, OperationalError, ProgrammingError + from psycopg import connect as __connect # pyright: ignore [reportUnknownVariableType] + from psycopg import sql, Error, DatabaseError, OperationalError, ProgrammingError def _connect(dsn: Optional[str] = None, **kwargs: Any) -> 'Connection[Any]': """Call :func:`psycopg.connect` with *dsn* and ``**kwargs``. @@ -56,7 +57,7 @@ def _connect(dsn: Optional[str] = None, **kwargs: Any) -> 'Connection[Any]': :returns: a connection to the database. """ - ret = __connect(dsn or "", **kwargs) + ret: 'Connection[Any]' = __connect(dsn or "", **kwargs) setattr(ret, 'server_version', ret.pgconn.server_version) # compatibility with psycopg2 return ret diff --git a/patroni/validator.py b/patroni/validator.py index 204c81f0d..233b05b72 100644 --- a/patroni/validator.py +++ b/patroni/validator.py @@ -11,8 +11,7 @@ from typing import Any, Dict, Union, Iterator, List, Optional as OptionalType, Tuple, TYPE_CHECKING -from .collections import CaseInsensitiveSet - +from .collections import CaseInsensitiveSet, EMPTY_DICT from .dcs import dcs_modules from .exceptions import ConfigParseError from .utils import parse_int, split_host_port, data_directory_is_empty, get_major_version @@ -245,7 +244,7 @@ def get_bin_name(bin_name: str) -> str: """ if TYPE_CHECKING: # pragma: no cover assert isinstance(schema.data, dict) - return (schema.data.get('postgresql', {}).get('bin_name', {}) or {}).get(bin_name, bin_name) + return (schema.data.get('postgresql', {}).get('bin_name', {}) or EMPTY_DICT).get(bin_name, bin_name) def validate_data_dir(data_dir: str) -> bool: diff --git a/patroni/version.py b/patroni/version.py index 232dc4a90..46b5acc04 100644 --- a/patroni/version.py +++ b/patroni/version.py @@ -2,4 +2,4 @@ :var __version__: the current Patroni version. """ -__version__ = '3.2.2' +__version__ = '3.3.0' diff --git a/tests/test_config.py b/tests/test_config.py index 7808be6c4..2ca38cbab 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -160,12 +160,10 @@ def test_invalid_path(self): @patch('patroni.config.logger') def test__validate_failover_tags(self, mock_logger, mock_get): """Ensures that only one of `nofailover` or `failover_priority` can be provided""" - config = Config("postgres0.yml") - # Providing one of `nofailover` or `failover_priority` is fine for single_param in ({"nofailover": True}, {"failover_priority": 1}, {"failover_priority": 0}): mock_get.side_effect = [single_param] * 2 - self.assertIsNone(config._validate_failover_tags()) + self.assertIsNone(self.config._validate_failover_tags()) mock_logger.warning.assert_not_called() # Providing both `nofailover` and `failover_priority` is fine if consistent @@ -175,7 +173,7 @@ def test__validate_failover_tags(self, mock_logger, mock_get): {"nofailover": "False", "failover_priority": 0} ): mock_get.side_effect = [consistent_state] * 2 - self.assertIsNone(config._validate_failover_tags()) + self.assertIsNone(self.config._validate_failover_tags()) mock_logger.warning.assert_not_called() # Providing both inconsistently should log a warning @@ -186,7 +184,7 @@ def test__validate_failover_tags(self, mock_logger, mock_get): {"nofailover": "", "failover_priority": 0} ): mock_get.side_effect = [inconsistent_state] * 2 - self.assertIsNone(config._validate_failover_tags()) + self.assertIsNone(self.config._validate_failover_tags()) mock_logger.warning.assert_called_once_with( 'Conflicting configuration between nofailover: %s and failover_priority: %s.' + ' Defaulting to nofailover: %s', diff --git a/tests/test_consul.py b/tests/test_consul.py index 87fb47045..8c7eae1d5 100644 --- a/tests/test_consul.py +++ b/tests/test_consul.py @@ -160,7 +160,7 @@ def test_take_leader(self): self.c.set_ttl(20) self.c._do_refresh_session = Mock() self.assertFalse(self.c.take_leader()) - with patch('time.time', Mock(side_effect=[0, 0, 0, 100, 100])): + with patch('time.time', Mock(side_effect=[0, 0, 0, 100, 100, 100])): self.assertFalse(self.c.take_leader()) @patch.object(consul.Consul.KV, 'put', Mock(return_value=True)) diff --git a/tests/test_etcd3.py b/tests/test_etcd3.py index 3e1bd9e83..ffcabd50c 100644 --- a/tests/test_etcd3.py +++ b/tests/test_etcd3.py @@ -273,7 +273,7 @@ def test_take_leader(self): def test_attempt_to_acquire_leader(self): self.assertFalse(self.etcd3.attempt_to_acquire_leader()) - with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 0, 100, 200])): + with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 0, 100, 200, 300])): self.assertFalse(self.etcd3.attempt_to_acquire_leader()) with patch('time.time', Mock(side_effect=[0, 100, 200, 300, 400])): self.assertRaises(Etcd3Error, self.etcd3.attempt_to_acquire_leader) diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index 235d24b20..59887e305 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -80,7 +80,7 @@ def mock_namespaced_kind(*args, **kwargs): def mock_load_k8s_config(self, *args, **kwargs): - self._server = '' + self._server = 'http://localhost' class TestK8sConfig(unittest.TestCase): @@ -242,6 +242,7 @@ def setUp(self, config=None): self.k.get_cluster() +@patch('urllib3.PoolManager.request', Mock()) @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', mock_namespaced_kind, create=True) class TestKubernetesConfigMaps(BaseTestKubernetes): @@ -374,6 +375,7 @@ def test_reload_config(self, mock_warning): mock_warning.assert_called_once() +@patch('urllib3.PoolManager.request', Mock()) class TestKubernetesEndpointsNoPodIP(BaseTestKubernetes): @patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True) def setUp(self, config=None): @@ -388,6 +390,7 @@ def test_update_leader(self, mock_patch_namespaced_endpoints): self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.1') +@patch('urllib3.PoolManager.request', Mock()) class TestKubernetesEndpoints(BaseTestKubernetes): @patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True) @@ -478,6 +481,7 @@ def mock_watch(*args): return urllib3.HTTPResponse() +@patch('urllib3.PoolManager.request', Mock()) class TestCacheBuilder(BaseTestKubernetes): @patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True)