diff --git a/examples/py/snippets/guides.py b/examples/py/snippets/guides.py index 0847aee..605de4e 100644 --- a/examples/py/snippets/guides.py +++ b/examples/py/snippets/guides.py @@ -108,7 +108,9 @@ class AppendCallback(hstreamdb.BufferedProducer.AppendCallback): def on_success(self, stream_name, payloads, stream_keyid): self.count += 1 - print(f"Batch {self.count}: Append success with {len(payloads)} payloads.") + print( + f"Batch {self.count}: Append success with {len(payloads)} payloads." + ) def on_fail(self, stream_name, payloads, stream_keyid, e): print("Append failed!") @@ -179,6 +181,25 @@ async def read_reader(client): if __name__ == "__main__": + import functools + + # TODO: the client should be able to retry automatically if server is + # UNAVAILABLE (e.g. NOSEQUENCER) + def retry(async_function, max_retries=10, delay=1): + @functools.wraps(async_function) + async def wrapper(client): + for attempt in range(1, max_retries + 1): + try: + result = await async_function(client) + return result # If successful, return the result + except Exception as e: + print(f"Attempt {attempt} failed: {e}") + await asyncio.sleep(delay) # Wait before the next attempt + + raise ValueError(f"Function failed after {max_retries} attempts") + + return wrapper + def safe_run(fun, *args): try: fun(*args) @@ -190,8 +211,8 @@ def safe_run(fun, *args): main( create_stream, list_streams, - append_records, - buffered_append_records, + retry(append_records), + retry(buffered_append_records), create_subscription, list_subscriptions, subscribe_records,