Skip to content

Commit

Permalink
Disable WAL streaming on standby node via new boolean tag "nostream" (p…
Browse files Browse the repository at this point in the history
…atroni#2842)

Add support for ``nostream`` tag. If set to ``true`` the node will not use replication protocol to stream WAL. It will rely instead on archive recovery (if ``restore_command`` is configured) and ``pg_wal``/``pg_xlog`` polling. It also disables copying and synchronization of permanent logical replication slots on the node itself and all its cascading replicas. Setting this tag on primary node has no effect.
  • Loading branch information
gsmolk authored Mar 20, 2024
1 parent 014777b commit b09af64
Show file tree
Hide file tree
Showing 14 changed files with 119 additions and 11 deletions.
4 changes: 4 additions & 0 deletions docs/dynamic_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ Note: if cluster topology is static (fixed number of nodes that never change the
.. warning::
Permanent replication slots are synchronized only from the ``primary``/``standby_leader`` to replica nodes. That means, applications are supposed to be using them only from the leader node. Using them on replica nodes will cause indefinite growth of ``pg_wal`` on all other nodes in the cluster.
An exception to that rule are permanent physical slots that match the Patroni member names, if you happen to configure any. Those will be synchronized among all nodes as they are used for replication among them.


.. warning::
Setting ``nostream`` tag on standby disables copying and synchronization of permanent logical replication slots on the node itself and all its cascading replicas if any.
2 changes: 1 addition & 1 deletion docs/replication_modes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ are available. As a downside, the primary is not be available for writes
blocking all client write requests until at least one synchronous replica comes
up.

You can ensure that a standby never becomes the synchronous standby by setting ``nosync`` tag to true. This is recommended to set for standbys that are behind slow network connections and would cause performance degradation when becoming a synchronous standby.
You can ensure that a standby never becomes the synchronous standby by setting ``nosync`` tag to true. This is recommended to set for standbys that are behind slow network connections and would cause performance degradation when becoming a synchronous standby. Setting tag ``nostream`` to true will also have the same effect.

Synchronous mode can be switched on and off via Patroni REST interface. See :ref:`dynamic configuration <dynamic_configuration>` for instructions.

Expand Down
1 change: 1 addition & 0 deletions docs/yaml_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ Tags
- **nosync**: ``true`` or ``false``. If set to ``true`` the node will never be selected as a synchronous replica.
- **nofailover**: ``true`` or ``false``, controls whether this node is allowed to participate in the leader race and become a leader. Defaults to ``false``, meaning this node _can_ participate in leader races.
- **failover_priority**: integer, controls the priority that this node should have during failover. Nodes with higher priority will be preferred over lower priority nodes if they received/replayed the same amount of WAL. However, nodes with higher values of receive/replay LSN are preferred regardless of their priority. If the ``failover_priority`` is 0 or negative - such node is not allowed to participate in the leader race and to become a leader (similar to ``nofailover: true``).
- **nostream**: ``true`` or ``false``. If set to ``true`` the node will not use replication protocol to stream WAL. It will rely instead on archive recovery (if ``restore_command`` is configured) and ``pg_wal``/``pg_xlog`` polling. It also disables copying and synchronization of permanent logical replication slots on the node itself and all its cascading replicas. Setting this tag on primary node has no effect.

.. warning::
Provide only one of ``nofailover`` or ``failover_priority``. Providing ``nofailover: true`` is the same as ``failover_priority: 0``, and providing ``nofailover: false`` will give the node priority 1.
Expand Down
1 change: 1 addition & 0 deletions patroni/config_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def get_template_config(cls) -> Dict[str, Any]:
'noloadbalance': False,
'clonefrom': True,
'nosync': False,
'nostream': False,
}
}

Expand Down
17 changes: 13 additions & 4 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,8 @@ def _get_permanent_slots(self, postgresql: 'Postgresql', tags: Tags, role: str)
.. note::
Permanent replication slots are only considered if ``use_slots`` configuration is enabled.
A node that is not supposed to become a leader (*nofailover*) will not have permanent replication slots.
Also node with disabled streaming (*nostream*) and its cascading followers must not have permanent
logical slots due to lack of feedback from node to primary, which makes them unsafe to use.
In a standby cluster we only support physical replication slots.
Expand All @@ -1054,7 +1056,7 @@ def _get_permanent_slots(self, postgresql: 'Postgresql', tags: Tags, role: str)
if not global_config.use_slots or tags.nofailover:
return {}

if global_config.is_standby_cluster:
if global_config.is_standby_cluster or self.get_slot_name_on_primary(postgresql.name, tags) is None:
return self.__permanent_physical_slots \
if postgresql.major_version >= SLOT_ADVANCE_AVAILABLE_VERSION or role == 'standby_leader' else {}

Expand All @@ -1069,6 +1071,10 @@ def _get_members_slots(self, name: str, role: str) -> Dict[str, Dict[str, str]]:
the ``replicatefrom`` destination member is currently not a member of the cluster (fallback to the
primary), or if ``replicatefrom`` destination member happens to be the current primary.
If the ``nostream`` tag is set on the member - we should not create the replication slot for it on
the current primary or any other member even if ``replicatefrom`` is set, because ``nostream`` disables
WAL streaming.
Will log an error if:
* Conflicting slot names between members are found
Expand All @@ -1083,8 +1089,9 @@ def _get_members_slots(self, name: str, role: str) -> Dict[str, Dict[str, str]]:
if not global_config.use_slots:
return {}

# we always want to exclude the member with our name from the list
members = filter(lambda m: m.name != name, self.members)
# we always want to exclude the member with our name from the list,
# also exlude members with disabled WAL streaming
members = filter(lambda m: m.name != name and not m.nostream, self.members)

if role in ('master', 'primary', 'standby_leader'):
members = [m for m in members if m.replicatefrom is None
Expand Down Expand Up @@ -1172,7 +1179,7 @@ def should_enforce_hot_standby_feedback(self, postgresql: 'Postgresql', member:
return any(self.should_enforce_hot_standby_feedback(postgresql, m) for m in members)
return False

def get_slot_name_on_primary(self, name: str, tags: Tags) -> str:
def get_slot_name_on_primary(self, name: str, tags: Tags) -> Optional[str]:
"""Get the name of physical replication slot for this node on the primary.
.. note::
Expand All @@ -1186,6 +1193,8 @@ def get_slot_name_on_primary(self, name: str, tags: Tags) -> str:
:returns: the slot name on the primary that is in use for physical replication on this node.
"""
if tags.nostream:
return None
replicatefrom = self.get_member(tags.replicatefrom, False) if tags.replicatefrom else None
return self.get_slot_name_on_primary(replicatefrom.name, replicatefrom) \
if isinstance(replicatefrom, Member) else slot_name_from_member_name(name)
Expand Down
5 changes: 4 additions & 1 deletion patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,9 +607,12 @@ def _get_node_to_follow(self, cluster: Cluster) -> Union[Leader, Member, None]:
:returns: the node which we should be replicating from.
"""
# nostream is set, the node must not use WAL streaming
if self.patroni.nostream:
return None
# The standby leader or when there is no standby leader we want to follow
# the remote member, except when there is no standby leader in pause.
if self.is_standby_cluster() \
elif self.is_standby_cluster() \
and (cluster.leader and cluster.leader.name and cluster.leader.name == self.state_handler.name
or cluster.is_unlocked() and not self.is_paused()):
node_to_follow = self.get_remote_member()
Expand Down
14 changes: 11 additions & 3 deletions patroni/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@

from typing import Any, Dict, Optional

from patroni.utils import parse_int
from patroni.utils import parse_int, parse_bool


class Tags(abc.ABC):
"""An abstract class that encapsulates all the ``tags`` logic.
Child classes that want to use provided facilities must implement ``tags`` abstract property.
.. note::
Due to backward-compatibility reasons, old tags may have a less strict type conversion than new ones.
"""

@staticmethod
Expand All @@ -20,7 +23,7 @@ def _filter_tags(tags: Dict[str, Any]) -> Dict[str, Any]:
.. note::
A custom tag is any tag added to the configuration ``tags`` section that is not one of ``clonefrom``,
``nofailover``, ``noloadbalance`` or ``nosync``.
``nofailover``, ``noloadbalance``,``nosync`` or ``nostream``.
For most of the Patroni predefined tags, the returning object will only contain them if they are enabled as
they all are boolean values that default to disabled.
Expand All @@ -31,7 +34,7 @@ def _filter_tags(tags: Dict[str, Any]) -> Dict[str, Any]:
tag value.
"""
return {tag: value for tag, value in tags.items()
if any((tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync'),
if any((tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync', 'nostream'),
value,
tag == 'nofailover' and 'failover_priority' in tags))}

Expand Down Expand Up @@ -89,3 +92,8 @@ def nosync(self) -> bool:
def replicatefrom(self) -> Optional[str]:
"""Value of ``replicatefrom`` tag, if any."""
return self.tags.get('replicatefrom')

@property
def nostream(self) -> bool:
"""``True`` if ``nostream`` is ``True``, else ``False``."""
return parse_bool(self.tags.get('nostream')) or False
3 changes: 2 additions & 1 deletion patroni/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,7 @@ def validate_watchdog_mode(value: Any) -> None:
Optional("clonefrom"): bool,
Optional("noloadbalance"): bool,
Optional("replicatefrom"): str,
Optional("nosync"): bool
Optional("nosync"): bool,
Optional("nostream"): bool
}
})
1 change: 1 addition & 0 deletions postgres0.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,4 @@ tags:
noloadbalance: false
clonefrom: false
nosync: false
nostream: false
1 change: 1 addition & 0 deletions tests/test_config_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def _set_running_instance_config_vals(self):
'noloadbalance': False,
'clonefrom': True,
'nosync': False,
'nostream': False
}
}
patch_config(self.config, conf)
Expand Down
6 changes: 6 additions & 0 deletions tests/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def __init__(self, p, d):
self.api.connection_string = 'http://127.0.0.1:8008'
self.clonefrom = None
self.nosync = False
self.nostream = False
self.scheduled_restart = {'schedule': future_restart_time,
'postmaster_start_time': str(postmaster_start_time)}
self.watchdog = Watchdog(self.config)
Expand Down Expand Up @@ -474,6 +475,11 @@ def test_demote_because_update_lock_failed(self):
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'not promoting because failed to update leader lock in DCS')

def test_get_node_to_follow_nostream(self):
self.ha.patroni.nostream = True
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha._get_node_to_follow(self.ha.cluster), None)

@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_follow(self):
self.p.is_primary = false
Expand Down
10 changes: 10 additions & 0 deletions tests/test_patroni.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ def test_nosync(self):
self.p.tags['nosync'] = None
self.assertFalse(self.p.nosync)

def test_nostream(self):
self.p.tags['nostream'] = 'True'
self.assertTrue(self.p.nostream)
self.p.tags['nostream'] = 'None'
self.assertFalse(self.p.nostream)
self.p.tags['nostream'] = 'foo'
self.assertFalse(self.p.nostream)
self.p.tags['nostream'] = ''
self.assertFalse(self.p.nostream)

@patch.object(Thread, 'join', Mock())
def test_shutdown(self):
self.p.api.shutdown = Mock(side_effect=Exception)
Expand Down
62 changes: 62 additions & 0 deletions tests/test_slots.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,68 @@ def test_process_permanent_slots(self):
"confirmed_flush_lsn": 12345, "catalog_xmin": 105}])]
self.assertEqual(self.p.slots(), {})

def test_nostream_slot_processing(self):
config = ClusterConfig(
1, {'slots': {'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}}}, 1)
nostream_node = Member(0, 'test-2', 28, {
'state': 'running', 'conn_url': 'postgres://replicator:[email protected]:5436/postgres',
'tags': {'nostream': 'True'}
})
cascade_node = Member(0, 'test-3', 28, {
'state': 'running', 'conn_url': 'postgres://replicator:[email protected]:5436/postgres',
'tags': {'replicatefrom': 'test-2'}
})
stream_node = Member(0, 'test-4', 28, {
'state': 'running', 'conn_url': 'postgres://replicator:[email protected]:5436/postgres'})
cluster = Cluster(
True, config, self.leader, Status.empty(),
[self.leadermem, nostream_node, cascade_node, stream_node], None, SyncState.empty(), None, None)
global_config.update(cluster)

# sanity for primary
self.p.name = self.leadermem.name
self.assertEqual(
cluster._get_permanent_slots(self.p, self.leadermem, 'primary'),
{'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}})
self.assertEqual(
cluster._get_members_slots(self.p.name, 'primary'),
{'test_4': {'type': 'physical'}})

# nostream node must not have slot on primary
self.p.name = nostream_node.name
# permanent logical slots are not allowed on nostream node
self.assertEqual(
cluster._get_permanent_slots(self.p, nostream_node, 'replica'),
{'bar': {'type': 'physical'}})
self.assertEqual(
cluster.get_slot_name_on_primary(self.p.name, nostream_node),
None)

# check cascade member-slot existence on nostream node
self.assertEqual(
cluster._get_members_slots(nostream_node.name, 'replica'),
{'test_3': {'type': 'physical'}})

# cascade also does not entitled to have logical slot on itself ...
self.p.name = cascade_node.name
self.assertEqual(
cluster._get_permanent_slots(self.p, cascade_node, 'replica'),
{'bar': {'type': 'physical'}})
# ... and member-slot on primary
self.assertEqual(
cluster.get_slot_name_on_primary(self.p.name, cascade_node),
None)

# simple replica must have every permanent slot ...
self.p.name = stream_node.name
self.assertEqual(
cluster._get_permanent_slots(self.p, stream_node, 'replica'),
{'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}})
# ... and member-slot on primary
self.assertEqual(
cluster.get_slot_name_on_primary(self.p.name, stream_node),
'test_4')

@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
def test__ensure_logical_slots_replica(self):
self.p.set_role('replica')
Expand Down
3 changes: 2 additions & 1 deletion tests/test_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@
"nofailover": False,
"clonefrom": False,
"noloadbalance": False,
"nosync": False
"nosync": False,
"nostream": False
}
}

Expand Down

0 comments on commit b09af64

Please sign in to comment.