Skip to content

Commit

Permalink
simplify redis code, less redis calls
Browse files Browse the repository at this point in the history
  • Loading branch information
benedikt-bartscher committed Nov 29, 2024
1 parent 39cdce6 commit 15f1411
Showing 1 changed file with 10 additions and 22 deletions.
32 changes: 10 additions & 22 deletions reflex/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -3182,14 +3182,6 @@ class StateManagerRedis(StateManager):
"e" # For evicted events (i.e. maxmemory exceeded)
)

# These events indicate that a lock is no longer held
_redis_keyspace_lock_release_events: Set[bytes] = {
b"del",
b"expire",
b"expired",
b"evicted",
}

async def _get_parent_state(
self, token: str, state: BaseState | None = None
) -> BaseState | None:
Expand Down Expand Up @@ -3467,20 +3459,16 @@ async def _wait_lock(self, lock_key: bytes, lock_id: bytes) -> None:
raise
async with self.redis.pubsub() as pubsub:
await pubsub.psubscribe(lock_key_channel)
while not state_is_locked:
# wait for the lock to be released
while True:
if not await self.redis.exists(lock_key):
break # key was removed, try to get the lock again
message = await pubsub.get_message(
ignore_subscribe_messages=True,
timeout=self.lock_expiration / 1000.0,
)
if message is None:
continue
if message["data"] in self._redis_keyspace_lock_release_events:
break
state_is_locked = await self._try_get_lock(lock_key, lock_id)
# wait for the lock to be released
while True:
# fast path
if await self._try_get_lock(lock_key, lock_id):
return
# wait for lock events
_ = await pubsub.get_message(
ignore_subscribe_messages=True,
timeout=self.lock_expiration / 1000.0,
)

@contextlib.asynccontextmanager
async def _lock(self, token: str):
Expand Down

0 comments on commit 15f1411

Please sign in to comment.