From e7cf43d8be27fa765e1c97a4a108b719af683bb3 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 4 Oct 2024 09:28:23 -0700 Subject: [PATCH 1/2] harden redis --- .../danswer/background/celery/celeryconfig.py | 21 +++++++++++++++++++ backend/danswer/configs/app_configs.py | 16 ++++++++++++++ backend/danswer/configs/constants.py | 12 +++++++++++ backend/danswer/redis/redis_pool.py | 13 ++++++++++-- .../docker_compose/docker-compose.dev.yml | 1 + .../docker_compose/docker-compose.gpu-dev.yml | 3 ++- 6 files changed, 63 insertions(+), 3 deletions(-) diff --git a/backend/danswer/background/celery/celeryconfig.py b/backend/danswer/background/celery/celeryconfig.py index d0314adf865..1b1aa092d17 100644 --- a/backend/danswer/background/celery/celeryconfig.py +++ b/backend/danswer/background/celery/celeryconfig.py @@ -1,7 +1,9 @@ # docs: https://docs.celeryq.dev/en/stable/userguide/configuration.html +from danswer.configs.app_configs import CELERY_BROKER_POOL_LIMIT from danswer.configs.app_configs import CELERY_RESULT_EXPIRES from danswer.configs.app_configs import REDIS_DB_NUMBER_CELERY from danswer.configs.app_configs import REDIS_DB_NUMBER_CELERY_RESULT_BACKEND +from danswer.configs.app_configs import REDIS_HEALTH_CHECK_INTERVAL from danswer.configs.app_configs import REDIS_HOST from danswer.configs.app_configs import REDIS_PASSWORD from danswer.configs.app_configs import REDIS_PORT @@ -9,6 +11,7 @@ from danswer.configs.app_configs import REDIS_SSL_CA_CERTS from danswer.configs.app_configs import REDIS_SSL_CERT_REQS from danswer.configs.constants import DanswerCeleryPriority +from danswer.configs.constants import REDIS_SOCKET_KEEPALIVE_OPTIONS CELERY_SEPARATOR = ":" @@ -36,12 +39,30 @@ # can stall other tasks. worker_prefetch_multiplier = 4 +broker_connection_retry_on_startup = True +broker_pool_limit = CELERY_BROKER_POOL_LIMIT + +# redis broker settings +# https://docs.celeryq.dev/projects/kombu/en/stable/reference/kombu.transport.redis.html broker_transport_options = { "priority_steps": list(range(len(DanswerCeleryPriority))), "sep": CELERY_SEPARATOR, "queue_order_strategy": "priority", + "retry_on_timeout": True, + "health_check_interval": REDIS_HEALTH_CHECK_INTERVAL, + "socket_keepalive": True, + "socket_keepalive_options": REDIS_SOCKET_KEEPALIVE_OPTIONS, } +# redis backend settings +# https://docs.celeryq.dev/en/stable/userguide/configuration.html#redis-backend-settings + +# there doesn't appear to be a way to set socket_keepalive_options on the redis result backend +redis_socket_keepalive = True +redis_retry_on_timeout = True +redis_backend_health_check_interval = REDIS_HEALTH_CHECK_INTERVAL + + task_default_priority = DanswerCeleryPriority.MEDIUM task_acks_late = True diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index c943bc7f603..aa3cccc512f 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -164,6 +164,12 @@ ) REDIS_DB_NUMBER_CELERY = int(os.environ.get("REDIS_DB_NUMBER_CELERY", 15)) # broker +# will propagate to both our redis client as well as celery's redis client +REDIS_HEALTH_CHECK_INTERVAL = int(os.environ.get("REDIS_HEALTH_CHECK_INTERVAL", 60)) + +# our redis client only, not celery's +REDIS_POOL_MAX_CONNECTIONS = int(os.environ.get("REDIS_POOL_MAX_CONNECTIONS", 128)) + # https://docs.celeryq.dev/en/stable/userguide/configuration.html#redis-backend-settings # should be one of "required", "optional", or "none" REDIS_SSL_CERT_REQS = os.getenv("REDIS_SSL_CERT_REQS", "none") @@ -171,6 +177,16 @@ CELERY_RESULT_EXPIRES = int(os.environ.get("CELERY_RESULT_EXPIRES", 86400)) # seconds +# https://docs.celeryq.dev/en/stable/userguide/configuration.html#broker-pool-limit +# Setting to None may help when there is a proxy in the way closing idle connections +CELERY_BROKER_POOL_LIMIT_DEFAULT = 10 +try: + CELERY_BROKER_POOL_LIMIT = int( + os.environ.get("CELERY_BROKER_POOL_LIMIT", CELERY_BROKER_POOL_LIMIT_DEFAULT) + ) +except ValueError: + CELERY_BROKER_POOL_LIMIT = CELERY_BROKER_POOL_LIMIT_DEFAULT + ##### # Connector Configs ##### diff --git a/backend/danswer/configs/constants.py b/backend/danswer/configs/constants.py index 52314db920c..678b3a5499d 100644 --- a/backend/danswer/configs/constants.py +++ b/backend/danswer/configs/constants.py @@ -1,3 +1,5 @@ +import platform +import socket from enum import auto from enum import Enum @@ -203,3 +205,13 @@ class DanswerCeleryPriority(int, Enum): MEDIUM = auto() LOW = auto() LOWEST = auto() + + +REDIS_SOCKET_KEEPALIVE_OPTIONS = {} +REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPINTVL] = 15 +REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPCNT] = 3 + +if platform.system() == "Darwin": + REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPALIVE] = 60 # type: ignore +else: + REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPIDLE] = 60 # type: ignore diff --git a/backend/danswer/redis/redis_pool.py b/backend/danswer/redis/redis_pool.py index 45ab39d8a02..9668fc6ef1e 100644 --- a/backend/danswer/redis/redis_pool.py +++ b/backend/danswer/redis/redis_pool.py @@ -6,14 +6,15 @@ from redis.connection import ConnectionPool from danswer.configs.app_configs import REDIS_DB_NUMBER +from danswer.configs.app_configs import REDIS_HEALTH_CHECK_INTERVAL from danswer.configs.app_configs import REDIS_HOST from danswer.configs.app_configs import REDIS_PASSWORD +from danswer.configs.app_configs import REDIS_POOL_MAX_CONNECTIONS from danswer.configs.app_configs import REDIS_PORT from danswer.configs.app_configs import REDIS_SSL from danswer.configs.app_configs import REDIS_SSL_CA_CERTS from danswer.configs.app_configs import REDIS_SSL_CERT_REQS - -REDIS_POOL_MAX_CONNECTIONS = 10 +from danswer.configs.constants import REDIS_SOCKET_KEEPALIVE_OPTIONS class RedisPool: @@ -55,6 +56,10 @@ def create_pool( db=db, password=password, max_connections=max_connections, + timeout=None, + health_check_interval=REDIS_HEALTH_CHECK_INTERVAL, + socket_keepalive=True, + socket_keepalive_options=REDIS_SOCKET_KEEPALIVE_OPTIONS, connection_class=redis.SSLConnection, ssl_ca_certs=ssl_ca_certs, ssl_cert_reqs=ssl_cert_reqs, @@ -66,6 +71,10 @@ def create_pool( db=db, password=password, max_connections=max_connections, + timeout=None, + health_check_interval=REDIS_HEALTH_CHECK_INTERVAL, + socket_keepalive=True, + socket_keepalive_options=REDIS_SOCKET_KEEPALIVE_OPTIONS, ) diff --git a/deployment/docker_compose/docker-compose.dev.yml b/deployment/docker_compose/docker-compose.dev.yml index 5893fc2960f..885c9350049 100644 --- a/deployment/docker_compose/docker-compose.dev.yml +++ b/deployment/docker_compose/docker-compose.dev.yml @@ -87,6 +87,7 @@ services: - LOG_ENDPOINT_LATENCY=${LOG_ENDPOINT_LATENCY:-} - LOG_POSTGRES_LATENCY=${LOG_POSTGRES_LATENCY:-} - LOG_POSTGRES_CONN_COUNTS=${LOG_POSTGRES_CONN_COUNTS:-} + - CELERY_BROKER_POOL_LIMIT=${CELERY_BROKER_POOL_LIMIT:-} # Chat Configs - HARD_DELETE_CHATS=${HARD_DELETE_CHATS:-} diff --git a/deployment/docker_compose/docker-compose.gpu-dev.yml b/deployment/docker_compose/docker-compose.gpu-dev.yml index bc8bf10dffc..59ea69e5457 100644 --- a/deployment/docker_compose/docker-compose.gpu-dev.yml +++ b/deployment/docker_compose/docker-compose.gpu-dev.yml @@ -80,7 +80,8 @@ services: # If set to `true` will enable additional logs about Vespa query performance # (time spent on finding the right docs + time spent fetching summaries from disk) - LOG_VESPA_TIMING_INFORMATION=${LOG_VESPA_TIMING_INFORMATION:-} - + - CELERY_BROKER_POOL_LIMIT=${CELERY_BROKER_POOL_LIMIT:-} + # Chat Configs - HARD_DELETE_CHATS=${HARD_DELETE_CHATS:-} From a6b3d03b336c8840d90b0455c0e9c784dba08626 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 4 Oct 2024 09:31:30 -0700 Subject: [PATCH 2/2] use blockingconnectionpool --- backend/danswer/redis/redis_pool.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/backend/danswer/redis/redis_pool.py b/backend/danswer/redis/redis_pool.py index 9668fc6ef1e..54cb8d918e4 100644 --- a/backend/danswer/redis/redis_pool.py +++ b/backend/danswer/redis/redis_pool.py @@ -3,7 +3,6 @@ import redis from redis.client import Redis -from redis.connection import ConnectionPool from danswer.configs.app_configs import REDIS_DB_NUMBER from danswer.configs.app_configs import REDIS_HEALTH_CHECK_INTERVAL @@ -20,7 +19,7 @@ class RedisPool: _instance: Optional["RedisPool"] = None _lock: threading.Lock = threading.Lock() - _pool: ConnectionPool + _pool: redis.BlockingConnectionPool def __new__(cls) -> "RedisPool": if not cls._instance: @@ -46,11 +45,15 @@ def create_pool( ssl_ca_certs: str | None = REDIS_SSL_CA_CERTS, ssl_cert_reqs: str = REDIS_SSL_CERT_REQS, ssl: bool = False, - ) -> redis.ConnectionPool: + ) -> redis.BlockingConnectionPool: + """We use BlockingConnectionPool because it will block and wait for a connection + rather than error if max_connections is reached. This is far more deterministic + behavior and aligned with how we want to use Redis.""" + # Using ConnectionPool is not well documented. # Useful examples: https://github.com/redis/redis-py/issues/780 if ssl: - return redis.ConnectionPool( + return redis.BlockingConnectionPool( host=host, port=port, db=db, @@ -65,7 +68,7 @@ def create_pool( ssl_cert_reqs=ssl_cert_reqs, ) - return redis.ConnectionPool( + return redis.BlockingConnectionPool( host=host, port=port, db=db,