Skip to content

Commit

Permalink
Merge pull request #70 from valkey-io/aiven-sal/rp_import
Browse files Browse the repository at this point in the history
Import some changes from redis-py:


* Fix socket_timeout init
* Add missing type hints for `retry.py`
* Add details to the asyncio connection error message
* Format connection errors in the same way everywhere
* Drop unused dev-dep urllib3
* Close Unix sockets if the connection attempt fails
* Cluster performance improvements
* Bump version deps
* Fix checking of module versions
* Avoid dangling sockets in case of SSL handshake failure
* Get rid of event_loop warnings
* Resolve some docs warnings
* Ensure safe defaults for TLS
  • Loading branch information
aiven-sal authored Aug 7, 2024
2 parents 92e3d34 + 30cc14c commit 844a635
Show file tree
Hide file tree
Showing 22 changed files with 278 additions and 173 deletions.
1 change: 1 addition & 0 deletions .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,4 @@ valkeymodules
virtualenv
www
md
yaml
1 change: 0 additions & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pytest-asyncio
pytest-cov
pytest-timeout
ujson>=4.2.0
urllib3<2
uvloop
vulture>=2.3.0
wheel>=0.30.0
1 change: 0 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@
# further. For a list of options available for each theme, see the
# documentation.
html_theme_options = {
"display_version": True,
"footer_icons": [
{
"name": "GitHub",
Expand Down
6 changes: 3 additions & 3 deletions docs/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ClusterNode
Async Client
************

See complete example: `here <examples/asyncio_examples.html>`_
See complete example: `here <examples/asyncio_examples.html>`__

This client is used for communicating with Valkey, asynchronously.

Expand Down Expand Up @@ -88,7 +88,7 @@ ClusterPipeline (Async)
Connection
**********

See complete example: `here <examples/connection_examples.html>`_
See complete example: `here <examples/connection_examples.html>`__

Connection
==========
Expand All @@ -104,7 +104,7 @@ Connection (Async)
Connection Pools
****************

See complete example: `here <examples/connection_examples.html>`_
See complete example: `here <examples/connection_examples.html>`__

ConnectionPool
==============
Expand Down
8 changes: 4 additions & 4 deletions docs/opentelemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Integrating OpenTelemetry
What is OpenTelemetry?
----------------------

`OpenTelemetry <https://opentelemetry.io>`_ is an open-source observability framework for traces, metrics, and logs. It is a merger of OpenCensus and OpenTracing projects hosted by Cloud Native Computing Foundation.
`OpenTelemetry <https://opentelemetry.io>`__ is an open-source observability framework for traces, metrics, and logs. It is a merger of OpenCensus and OpenTracing projects hosted by Cloud Native Computing Foundation.

OpenTelemetry allows developers to collect and export telemetry data in a vendor agnostic way. With OpenTelemetry, you can instrument your application once and then add or change vendors without changing the instrumentation, for example, here is a list of `popular DataDog competitors <https://uptrace.dev/get/compare/datadog-competitors.html>`_ that support OpenTelemetry.

Expand Down Expand Up @@ -61,7 +61,7 @@ Once the code is patched, you can use valkey-py as usually:
OpenTelemetry API
-----------------

`OpenTelemetry <https://uptrace.dev/opentelemetry/>`_ API is a programming interface that you can use to instrument code and collect telemetry data such as traces, metrics, and logs.
`OpenTelemetry API <https://uptrace.dev/opentelemetry/>`__ is a programming interface that you can use to instrument code and collect telemetry data such as traces, metrics, and logs.

You can use OpenTelemetry API to measure important operations:

Expand Down Expand Up @@ -125,7 +125,7 @@ Alerting and notifications

Uptrace also allows you to monitor `OpenTelemetry metrics <https://uptrace.dev/opentelemetry/metrics.html>`_ using alerting rules. For example, the following monitor uses the group by node expression to create an alert whenever an individual Valkey shard is down:

.. code-block:: python
.. code-block:: yaml
monitors:
- name: Valkey shard is down
Expand All @@ -142,7 +142,7 @@ Uptrace also allows you to monitor `OpenTelemetry metrics <https://uptrace.dev/o
You can also create queries with more complex expressions. For example, the following rule creates an alert when the keyspace hit rate is lower than 75%:

.. code-block:: python
.. code-block:: yaml
monitors:
- name: Valkey read hit rate < 75%
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
async-timeout>=4.0.2
async-timeout>=4.0.3
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@
],
extras_require={
"libvalkey": ["libvalkey>=4.0.0b1"],
"ocsp": ["cryptography>=36.0.1", "pyopenssl==20.0.1", "requests>=2.26.0"],
"ocsp": ["cryptography>=36.0.1", "pyopenssl==23.2.1", "requests>=2.31.0"],
},
)
4 changes: 3 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ def skip_ifmodversion_lt(min_version: str, module_name: str):
for j in modules:
if module_name == j.get("name"):
version = j.get("ver")
mv = int(min_version.replace(".", ""))
mv = int(
"".join(["%02d" % int(segment) for segment in min_version.split(".")])
)
check = version < mv
return pytest.mark.skipif(check, reason="Valkey module version")

Expand Down
56 changes: 55 additions & 1 deletion tests/test_asyncio/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
parse_url,
)
from valkey.asyncio import ConnectionPool, Valkey
from valkey.asyncio.connection import Connection, UnixDomainSocketConnection
from valkey.asyncio.connection import (
Connection,
SSLConnection,
UnixDomainSocketConnection,
)
from valkey.asyncio.retry import Retry
from valkey.backoff import NoBackoff
from valkey.exceptions import ConnectionError, InvalidResponse, TimeoutError
Expand Down Expand Up @@ -494,3 +498,53 @@ async def test_connection_garbage_collection(request):

await client.aclose()
await pool.aclose()


@pytest.mark.parametrize(
"conn, error, expected_message",
[
(SSLConnection(), OSError(), "Error connecting to localhost:6379."),
(SSLConnection(), OSError(12), "Error 12 connecting to localhost:6379."),
(
SSLConnection(),
OSError(12, "Some Error"),
"Error 12 connecting to localhost:6379. Some Error.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(),
"Error connecting to unix:///tmp/valkey.sock.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(12),
"Error 12 connecting to unix:///tmp/valkey.sock.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(12, "Some Error"),
"Error 12 connecting to unix:///tmp/valkey.sock. Some Error.",
),
],
)
async def test_format_error_message(conn, error, expected_message):
"""Test that the _error_message function formats errors correctly"""
error_message = conn._error_message(error)
assert error_message == expected_message


async def test_network_connection_failure():
with pytest.raises(ConnectionError) as e:
valkey = Valkey(host="127.0.0.1", port=9999)
await valkey.set("a", "b")
assert str(e.value).startswith("Error 111 connecting to 127.0.0.1:9999. Connect")


async def test_unix_socket_connection_failure():
with pytest.raises(ConnectionError) as e:
valkey = Valkey(unix_socket_path="unix:///tmp/a.sock")
await valkey.set("a", "b")
assert (
str(e.value)
== "Error 2 connecting to unix:///tmp/a.sock. No such file or directory."
)
6 changes: 3 additions & 3 deletions tests/test_asyncio/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ async def test_blocking(self, r):
lock_2 = self.get_lock(r, "foo")
assert lock_2.blocking

async def test_blocking_timeout(self, r, event_loop):
async def test_blocking_timeout(self, r):
lock1 = self.get_lock(r, "foo")
assert await lock1.acquire(blocking=False)
bt = 0.2
sleep = 0.05
lock2 = self.get_lock(r, "foo", sleep=sleep, blocking_timeout=bt)
start = event_loop.time()
start = asyncio.get_running_loop().time()
assert not await lock2.acquire()
# The elapsed duration should be less than the total blocking_timeout
assert bt >= (event_loop.time() - start) > bt - sleep
assert bt >= (asyncio.get_running_loop().time() - start) > bt - sleep
await lock1.release()

async def test_context_manager(self, r):
Expand Down
17 changes: 17 additions & 0 deletions tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ def test_tcp_ssl_tls12_custom_ciphers(tcp_address, ssl_ciphers):
)


"""
Addresses bug CAE-333 which uncovered that the init method of the base
class did override the initialization of the socket_timeout parameter.
"""


def test_unix_socket_with_timeout():
conn = UnixDomainSocketConnection(socket_timeout=1000)

# Check if the base class defaults were taken over.
assert conn.db == 0

# Verify if the timeout and the path is set correctly.
assert conn.socket_timeout == 1000
assert conn.path == ""


@pytest.mark.ssl
@pytest.mark.skipif(not ssl.HAS_TLSv1_3, reason="requires TLSv1.3")
def test_tcp_ssl_version_mismatch(tcp_address):
Expand Down
50 changes: 50 additions & 0 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,53 @@ def mock_disconnect(_):

assert called == 1
pool.disconnect()


@pytest.mark.parametrize(
"conn, error, expected_message",
[
(SSLConnection(), OSError(), "Error connecting to localhost:6379."),
(SSLConnection(), OSError(12), "Error 12 connecting to localhost:6379."),
(
SSLConnection(),
OSError(12, "Some Error"),
"Error 12 connecting to localhost:6379. Some Error.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(),
"Error connecting to unix:///tmp/valkey.sock.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(12),
"Error 12 connecting to unix:///tmp/valkey.sock.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(12, "Some Error"),
"Error 12 connecting to unix:///tmp/valkey.sock. Some Error.",
),
],
)
def test_format_error_message(conn, error, expected_message):
"""Test that the _error_message function formats errors correctly"""
error_message = conn._error_message(error)
assert error_message == expected_message


def test_network_connection_failure():
with pytest.raises(ConnectionError) as e:
valkey = Valkey(port=9999)
valkey.set("a", "b")
assert str(e.value) == "Error 111 connecting to localhost:9999. Connection refused."


def test_unix_socket_connection_failure():
with pytest.raises(ConnectionError) as e:
valkey = Valkey(unix_socket_path="unix:///tmp/a.sock")
valkey.set("a", "b")
assert (
str(e.value)
== "Error 2 connecting to unix:///tmp/a.sock. No such file or directory."
)
4 changes: 2 additions & 2 deletions tests/test_retry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from unittest.mock import patch

import pytest
from valkey.backoff import ExponentialBackoff, NoBackoff
from valkey.backoff import AbstractBackoff, ExponentialBackoff, NoBackoff
from valkey.client import Valkey
from valkey.connection import Connection, UnixDomainSocketConnection
from valkey.exceptions import (
Expand All @@ -15,7 +15,7 @@
from .conftest import _get_client


class BackoffMock:
class BackoffMock(AbstractBackoff):
def __init__(self):
self.reset_calls = 0
self.calls = 0
Expand Down
40 changes: 19 additions & 21 deletions valkey/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1315,37 +1315,35 @@ async def initialize(self) -> None:
port = int(primary_node[1])
host, port = self.remap_host_port(host, port)

nodes_for_slot = []

target_node = tmp_nodes_cache.get(get_node_name(host, port))
if not target_node:
target_node = ClusterNode(
host, port, PRIMARY, **self.connection_kwargs
)
# add this node to the nodes cache
tmp_nodes_cache[target_node.name] = target_node
nodes_for_slot.append(target_node)

replica_nodes = slot[3:]
for replica_node in replica_nodes:
host = replica_node[0]
port = replica_node[1]
host, port = self.remap_host_port(host, port)

target_replica_node = tmp_nodes_cache.get(get_node_name(host, port))
if not target_replica_node:
target_replica_node = ClusterNode(
host, port, REPLICA, **self.connection_kwargs
)
# add this node to the nodes cache
tmp_nodes_cache[target_replica_node.name] = target_replica_node
nodes_for_slot.append(target_replica_node)

for i in range(int(slot[0]), int(slot[1]) + 1):
if i not in tmp_slots:
tmp_slots[i] = []
tmp_slots[i].append(target_node)
replica_nodes = [slot[j] for j in range(3, len(slot))]

for replica_node in replica_nodes:
host = replica_node[0]
port = replica_node[1]
host, port = self.remap_host_port(host, port)

target_replica_node = tmp_nodes_cache.get(
get_node_name(host, port)
)
if not target_replica_node:
target_replica_node = ClusterNode(
host, port, REPLICA, **self.connection_kwargs
)
tmp_slots[i].append(target_replica_node)
# add this node to the nodes cache
tmp_nodes_cache[target_replica_node.name] = (
target_replica_node
)
tmp_slots[i] = nodes_for_slot
else:
# Validate that 2 nodes want to use the same slot cache
# setup
Expand Down
Loading

0 comments on commit 844a635

Please sign in to comment.