Skip to content

Commit

Permalink
Hash field expiration commands (#3218)
Browse files Browse the repository at this point in the history
Support hash field expiration commands that become available with
Redis 7.4.

Adapt some tests to match recent server-side changes. Update tests
related to memory stats. Make CLIENT KILL test not run with cluster.

---------

Co-authored-by: Gabriel Erzse <[email protected]>
Signed-off-by: Salvatore Mesoraca <[email protected]>
  • Loading branch information
aiven-sal and gerzse committed Jun 14, 2024
1 parent 06ab28b commit 2fbff83
Show file tree
Hide file tree
Showing 7 changed files with 1,045 additions and 7 deletions.
2 changes: 1 addition & 1 deletion tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1430,7 +1430,7 @@ async def test_memory_stats(self, r: ValkeyCluster) -> None:
assert isinstance(stats, dict)
for key, value in stats.items():
if key.startswith("db."):
assert isinstance(value, dict)
assert not isinstance(value, list)

@skip_if_server_version_lt("4.0.0")
async def test_memory_help(self, r: ValkeyCluster) -> None:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_asyncio/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,7 @@ async def test_hscan(self, r: valkey.Valkey):
_, dic = await r.hscan("a_notset", match="a")
assert dic == {}

@skip_if_server_version_lt("7.4.0")
@skip_if_server_version_lt("7.3.240")
async def test_hscan_novalues(self, r: valkey.Valkey):
await r.hset("a", mapping={"a": 1, "b": 2, "c": 3})
cursor, keys = await r.hscan("a", no_values=True)
Expand All @@ -1373,7 +1373,7 @@ async def test_hscan_iter(self, r: valkey.Valkey):
dic = {k: v async for k, v in r.hscan_iter("a_notset", match="a")}
assert dic == {}

@skip_if_server_version_lt("7.4.0")
@skip_if_server_version_lt("7.3.240")
async def test_hscan_iter_novalues(self, r: valkey.Valkey):
await r.hset("a", mapping={"a": 1, "b": 2, "c": 3})
keys = list([k async for k in r.hscan_iter("a", no_values=True)])
Expand Down
300 changes: 300 additions & 0 deletions tests/test_asyncio/test_hash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
import asyncio
from datetime import datetime, timedelta

from tests.conftest import skip_if_server_version_lt


@skip_if_server_version_lt("7.3.240")
async def test_hexpire_basic(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
assert await r.hexpire("test:hash", 1, "field1") == [1]
await asyncio.sleep(1.1)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is True


@skip_if_server_version_lt("7.3.240")
async def test_hexpire_with_timedelta(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
assert await r.hexpire("test:hash", timedelta(seconds=1), "field1") == [1]
await asyncio.sleep(1.1)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is True


@skip_if_server_version_lt("7.3.240")
async def test_hexpire_conditions(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1"})
assert await r.hexpire("test:hash", 2, "field1", xx=True) == [0]
assert await r.hexpire("test:hash", 2, "field1", nx=True) == [1]
assert await r.hexpire("test:hash", 1, "field1", xx=True) == [1]
assert await r.hexpire("test:hash", 2, "field1", nx=True) == [0]
await asyncio.sleep(1.1)
assert await r.hexists("test:hash", "field1") is False
await r.hset("test:hash", "field1", "value1")
await r.hexpire("test:hash", 2, "field1")
assert await r.hexpire("test:hash", 1, "field1", gt=True) == [0]
assert await r.hexpire("test:hash", 1, "field1", lt=True) == [1]
await asyncio.sleep(1.1)
assert await r.hexists("test:hash", "field1") is False


@skip_if_server_version_lt("7.3.240")
async def test_hexpire_nonexistent_key_or_field(r):
await r.delete("test:hash")
assert await r.hexpire("test:hash", 1, "field1") == []
await r.hset("test:hash", "field1", "value1")
assert await r.hexpire("test:hash", 1, "nonexistent_field") == [-2]


@skip_if_server_version_lt("7.3.240")
async def test_hexpire_multiple_fields(r):
await r.delete("test:hash")
await r.hset(
"test:hash",
mapping={"field1": "value1", "field2": "value2", "field3": "value3"},
)
assert await r.hexpire("test:hash", 1, "field1", "field2") == [1, 1]
await asyncio.sleep(1.1)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is False
assert await r.hexists("test:hash", "field3") is True


@skip_if_server_version_lt("7.3.240")
async def test_hpexpire_basic(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
assert await r.hpexpire("test:hash", 500, "field1") == [1]
await asyncio.sleep(0.6)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is True


@skip_if_server_version_lt("7.3.240")
async def test_hpexpire_with_timedelta(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
assert await r.hpexpire("test:hash", timedelta(milliseconds=500), "field1") == [1]
await asyncio.sleep(0.6)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is True


@skip_if_server_version_lt("7.3.240")
async def test_hpexpire_conditions(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1"})
assert await r.hpexpire("test:hash", 1500, "field1", xx=True) == [0]
assert await r.hpexpire("test:hash", 1500, "field1", nx=True) == [1]
assert await r.hpexpire("test:hash", 500, "field1", xx=True) == [1]
assert await r.hpexpire("test:hash", 1500, "field1", nx=True) == [0]
await asyncio.sleep(0.6)
assert await r.hexists("test:hash", "field1") is False
await r.hset("test:hash", "field1", "value1")
await r.hpexpire("test:hash", 1000, "field1")
assert await r.hpexpire("test:hash", 500, "field1", gt=True) == [0]
assert await r.hpexpire("test:hash", 500, "field1", lt=True) == [1]
await asyncio.sleep(0.6)
assert await r.hexists("test:hash", "field1") is False


@skip_if_server_version_lt("7.3.240")
async def test_hpexpire_nonexistent_key_or_field(r):
await r.delete("test:hash")
assert await r.hpexpire("test:hash", 500, "field1") == []
await r.hset("test:hash", "field1", "value1")
assert await r.hpexpire("test:hash", 500, "nonexistent_field") == [-2]


@skip_if_server_version_lt("7.3.240")
async def test_hpexpire_multiple_fields(r):
await r.delete("test:hash")
await r.hset(
"test:hash",
mapping={"field1": "value1", "field2": "value2", "field3": "value3"},
)
assert await r.hpexpire("test:hash", 500, "field1", "field2") == [1, 1]
await asyncio.sleep(0.6)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is False
assert await r.hexists("test:hash", "field3") is True


@skip_if_server_version_lt("7.3.240")
async def test_hexpireat_basic(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp())
assert await r.hexpireat("test:hash", exp_time, "field1") == [1]
await asyncio.sleep(1.1)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is True


@skip_if_server_version_lt("7.3.240")
async def test_hexpireat_with_datetime(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
exp_time = datetime.now() + timedelta(seconds=1)
assert await r.hexpireat("test:hash", exp_time, "field1") == [1]
await asyncio.sleep(1.1)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is True


@skip_if_server_version_lt("7.3.240")
async def test_hexpireat_conditions(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1"})
future_exp_time = int((datetime.now() + timedelta(seconds=2)).timestamp())
past_exp_time = int((datetime.now() - timedelta(seconds=1)).timestamp())
assert await r.hexpireat("test:hash", future_exp_time, "field1", xx=True) == [0]
assert await r.hexpireat("test:hash", future_exp_time, "field1", nx=True) == [1]
assert await r.hexpireat("test:hash", past_exp_time, "field1", gt=True) == [0]
assert await r.hexpireat("test:hash", past_exp_time, "field1", lt=True) == [2]
assert await r.hexists("test:hash", "field1") is False


@skip_if_server_version_lt("7.3.240")
async def test_hexpireat_nonexistent_key_or_field(r):
await r.delete("test:hash")
future_exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp())
assert await r.hexpireat("test:hash", future_exp_time, "field1") == []
await r.hset("test:hash", "field1", "value1")
assert await r.hexpireat("test:hash", future_exp_time, "nonexistent_field") == [-2]


@skip_if_server_version_lt("7.3.240")
async def test_hexpireat_multiple_fields(r):
await r.delete("test:hash")
await r.hset(
"test:hash",
mapping={"field1": "value1", "field2": "value2", "field3": "value3"},
)
exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp())
assert await r.hexpireat("test:hash", exp_time, "field1", "field2") == [1, 1]
await asyncio.sleep(1.1)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is False
assert await r.hexists("test:hash", "field3") is True


@skip_if_server_version_lt("7.3.240")
async def test_hpexpireat_basic(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
exp_time = int((datetime.now() + timedelta(milliseconds=400)).timestamp() * 1000)
assert await r.hpexpireat("test:hash", exp_time, "field1") == [1]
await asyncio.sleep(0.5)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is True


@skip_if_server_version_lt("7.3.240")
async def test_hpexpireat_with_datetime(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
exp_time = datetime.now() + timedelta(milliseconds=400)
assert await r.hpexpireat("test:hash", exp_time, "field1") == [1]
await asyncio.sleep(0.5)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is True


@skip_if_server_version_lt("7.3.240")
async def test_hpexpireat_conditions(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1"})
future_exp_time = int(
(datetime.now() + timedelta(milliseconds=500)).timestamp() * 1000
)
past_exp_time = int(
(datetime.now() - timedelta(milliseconds=500)).timestamp() * 1000
)
assert await r.hpexpireat("test:hash", future_exp_time, "field1", xx=True) == [0]
assert await r.hpexpireat("test:hash", future_exp_time, "field1", nx=True) == [1]
assert await r.hpexpireat("test:hash", past_exp_time, "field1", gt=True) == [0]
assert await r.hpexpireat("test:hash", past_exp_time, "field1", lt=True) == [2]
assert await r.hexists("test:hash", "field1") is False


@skip_if_server_version_lt("7.3.240")
async def test_hpexpireat_nonexistent_key_or_field(r):
await r.delete("test:hash")
future_exp_time = int(
(datetime.now() + timedelta(milliseconds=500)).timestamp() * 1000
)
assert await r.hpexpireat("test:hash", future_exp_time, "field1") == []
await r.hset("test:hash", "field1", "value1")
assert await r.hpexpireat("test:hash", future_exp_time, "nonexistent_field") == [-2]


@skip_if_server_version_lt("7.3.240")
async def test_hpexpireat_multiple_fields(r):
await r.delete("test:hash")
await r.hset(
"test:hash",
mapping={"field1": "value1", "field2": "value2", "field3": "value3"},
)
exp_time = int((datetime.now() + timedelta(milliseconds=400)).timestamp() * 1000)
assert await r.hpexpireat("test:hash", exp_time, "field1", "field2") == [1, 1]
await asyncio.sleep(0.5)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is False
assert await r.hexists("test:hash", "field3") is True


@skip_if_server_version_lt("7.3.240")
async def test_hpersist_multiple_fields_mixed_conditions(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
await r.hexpire("test:hash", 5000, "field1")
assert await r.hpersist("test:hash", "field1", "field2", "field3") == [1, -1, -2]


@skip_if_server_version_lt("7.3.240")
async def test_hexpiretime_multiple_fields_mixed_conditions(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
future_time = int((datetime.now() + timedelta(minutes=30)).timestamp())
await r.hexpireat("test:hash", future_time, "field1")
result = await r.hexpiretime("test:hash", "field1", "field2", "field3")
assert future_time - 10 < result[0] <= future_time
assert result[1:] == [-1, -2]


@skip_if_server_version_lt("7.3.240")
async def test_hpexpiretime_multiple_fields_mixed_conditions(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
future_time = int((datetime.now() + timedelta(minutes=30)).timestamp())
await r.hexpireat("test:hash", future_time, "field1")
result = await r.hpexpiretime("test:hash", "field1", "field2", "field3")
assert future_time * 1000 - 10000 < result[0] <= future_time * 1000
assert result[1:] == [-1, -2]


@skip_if_server_version_lt("7.3.240")
async def test_ttl_multiple_fields_mixed_conditions(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
future_time = int((datetime.now() + timedelta(minutes=30)).timestamp())
await r.hexpireat("test:hash", future_time, "field1")
result = await r.httl("test:hash", "field1", "field2", "field3")
assert 30 * 60 - 10 < result[0] <= 30 * 60
assert result[1:] == [-1, -2]


@skip_if_server_version_lt("7.3.240")
async def test_pttl_multiple_fields_mixed_conditions(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
future_time = int((datetime.now() + timedelta(minutes=30)).timestamp())
await r.hexpireat("test:hash", future_time, "field1")
result = await r.hpttl("test:hash", "field1", "field2", "field3")
assert 30 * 60000 - 10000 < result[0] <= 30 * 60000
assert result[1:] == [-1, -2]
2 changes: 1 addition & 1 deletion tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1548,7 +1548,7 @@ def test_memory_stats(self, r):
assert isinstance(stats, dict)
for key, value in stats.items():
if key.startswith("db."):
assert isinstance(value, dict)
assert not isinstance(value, list)

@skip_if_server_version_lt("4.0.0")
def test_memory_help(self, r):
Expand Down
7 changes: 4 additions & 3 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,8 @@ def test_client_kill_filter_by_user(self, r, request):
assert c["user"] != killuser
r.acl_deluser(killuser)

@skip_if_server_version_lt("7.4.0")
@skip_if_server_version_lt("7.3.240")
@pytest.mark.onlynoncluster
def test_client_kill_filter_by_maxage(self, r, request):
_get_client(valkey.Valkey, request, flushdb=False)
time.sleep(4)
Expand Down Expand Up @@ -2133,7 +2134,7 @@ def test_hscan(self, r):
_, dic = r.hscan("a_notset")
assert dic == {}

@skip_if_server_version_lt("7.4.0")
@skip_if_server_version_lt("7.3.240")
def test_hscan_novalues(self, r):
r.hset("a", mapping={"a": 1, "b": 2, "c": 3})
cursor, keys = r.hscan("a", no_values=True)
Expand All @@ -2154,7 +2155,7 @@ def test_hscan_iter(self, r):
dic = dict(r.hscan_iter("a_notset"))
assert dic == {}

@skip_if_server_version_lt("7.4.0")
@skip_if_server_version_lt("7.3.240")
def test_hscan_iter_novalues(self, r):
r.hset("a", mapping={"a": 1, "b": 2, "c": 3})
keys = list(r.hscan_iter("a", no_values=True))
Expand Down
Loading

0 comments on commit 2fbff83

Please sign in to comment.