Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/background prune 3 #2608

Closed
wants to merge 126 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
126 commits
Select commit Hold shift + click to select a range
10330fb
first cut at redis
rkuo-danswer Aug 23, 2024
7442957
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Aug 23, 2024
09e95fe
some new helper functions for the db
rkuo-danswer Aug 23, 2024
7244bae
Merge from feature/redis
rkuo-danswer Aug 23, 2024
25f4b08
ignore kombu tables in alembic migrations (used by celery)
rkuo-danswer Aug 29, 2024
d4c06b3
multiline commands for readability, add vespa_metadata_sync queue to …
rkuo-danswer Aug 30, 2024
5100065
typo fix
rkuo-danswer Aug 30, 2024
34a8bde
fix returning tuple fields
rkuo-danswer Aug 30, 2024
d856297
add constants
rkuo-danswer Aug 30, 2024
93fb8ef
fix _get_access_for_document
rkuo-danswer Aug 30, 2024
8d09dcf
docstrings!
rkuo-danswer Aug 30, 2024
6d05e29
fix double function declaration and typing
rkuo-danswer Aug 30, 2024
024d4dd
fix type hinting
rkuo-danswer Aug 30, 2024
186d4f1
add a global redis pool
rkuo-danswer Aug 30, 2024
5f7ca15
Add get_document function
rkuo-danswer Aug 30, 2024
66fb6b1
use task_logger in various celery tasks
rkuo-danswer Aug 30, 2024
f06efb9
add celeryconfig.py to simplify configuration. Will be used in a subs…
rkuo-danswer Aug 30, 2024
e36fd3c
Add celery redis helper. used in a subsequent PR
rkuo-danswer Aug 30, 2024
ab9cf9a
kombu warning getting spammy since celery is not self managing its qu…
rkuo-danswer Aug 30, 2024
48963ed
add last_modified and last_synced to documents
rkuo-danswer Aug 30, 2024
b4b7801
fix task naming convention
rkuo-danswer Aug 30, 2024
1cfbfe1
use celeryconfig.py
rkuo-danswer Aug 30, 2024
2144617
the big one. adds queues and tasks, updates functions to use the queu…
rkuo-danswer Aug 30, 2024
eea178f
change vespa index log line to debug
rkuo-danswer Aug 30, 2024
27eae3d
mypy fixes
rkuo-danswer Aug 30, 2024
db252a6
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Aug 30, 2024
c174d00
update alembic migration
rkuo-danswer Aug 30, 2024
c0a7c7c
fix fence ordering, rename to "monitor", fix fetch_versioned_implemen…
rkuo-danswer Aug 31, 2024
d17f49f
mypy
rkuo-danswer Sep 1, 2024
838a7fb
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 2, 2024
0652698
switch to monotonic time
rkuo-danswer Sep 2, 2024
9dbd29d
fix startup dependencies on redis
LostVector Sep 2, 2024
575db62
Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer
LostVector Sep 2, 2024
015fab9
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
LostVector Sep 2, 2024
bdcd553
rebase alembic migration
LostVector Sep 2, 2024
e731a33
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
LostVector Sep 2, 2024
c388a81
kombu cleanup - fail silently
LostVector Sep 2, 2024
a9aca68
Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer…
LostVector Sep 2, 2024
f0bdb39
mypy
LostVector Sep 2, 2024
e53ff25
Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer…
LostVector Sep 2, 2024
076ed18
add redis_host environment override
LostVector Sep 3, 2024
aa1eec8
update REDIS_HOST env var in docker-compose.dev.yml
LostVector Sep 3, 2024
f358acf
update the rest of the docker files
LostVector Sep 3, 2024
1d4b8cf
in flight
LostVector Sep 3, 2024
6724a13
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 4, 2024
69d762e
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 4, 2024
a56e3d7
Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer…
rkuo-danswer Sep 4, 2024
d6d3d24
harden indexing-status endpoint against db changes happening in the b…
rkuo-danswer Sep 4, 2024
6335f5b
allow no task syncs to run because we create certain objects with no …
rkuo-danswer Sep 4, 2024
4c5c4a2
add back writing to vespa on indexing
rkuo-danswer Sep 4, 2024
957bd55
Merge branch 'feature/background_processing' of https://github.com/da…
rkuo-danswer Sep 4, 2024
7133450
actually working connector deletion
rkuo-danswer Sep 5, 2024
590bd0f
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 5, 2024
98ec0b7
update contributing guide
rkuo-danswer Sep 5, 2024
ab097ce
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 5, 2024
f363c6b
Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer…
rkuo-danswer Sep 5, 2024
daf46b3
backporting fixes from background_deletion
rkuo-danswer Sep 5, 2024
dafbe13
renaming cache to cache_volume
rkuo-danswer Sep 5, 2024
bd07908
add redis password to various deployments
rkuo-danswer Sep 5, 2024
0bb2e57
try setting up pr testing for helm
rkuo-danswer Sep 5, 2024
0b293dd
fix indent
rkuo-danswer Sep 5, 2024
82bd7f4
hopefully this release version actually exists
rkuo-danswer Sep 5, 2024
0988947
fix command line option to --chart-dirs
rkuo-danswer Sep 5, 2024
c20d921
fetch-depth 0
rkuo-danswer Sep 5, 2024
d6c3bd3
edit values.yaml
rkuo-danswer Sep 5, 2024
b8d9205
try setting ct working directory
rkuo-danswer Sep 5, 2024
f90cfa7
bypass testing only on change for now
rkuo-danswer Sep 5, 2024
af34d19
move files and lint them
rkuo-danswer Sep 6, 2024
5d6fe92
update helm testing
rkuo-danswer Sep 6, 2024
8cf4098
some issues suggest using --config works
rkuo-danswer Sep 6, 2024
bc5cb2d
add vespa repo
rkuo-danswer Sep 6, 2024
ea35879
add postgresql repo
rkuo-danswer Sep 6, 2024
b427439
increase timeout
rkuo-danswer Sep 6, 2024
97d3671
try amd64 runner
rkuo-danswer Sep 6, 2024
3574c23
fix redis password reference
rkuo-danswer Sep 6, 2024
f02a231
add comment to helm chart testing workflow
rkuo-danswer Sep 6, 2024
3323d4c
rename helm testing workflow to disable it
rkuo-danswer Sep 6, 2024
3afa07a
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 6, 2024
2af4926
Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer…
rkuo-danswer Sep 6, 2024
7512d68
Merge branch 'feature/background_processing' of https://github.com/da…
rkuo-danswer Sep 6, 2024
29860f8
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 6, 2024
f5cbb6d
Merge branch 'feature/background_processing' of https://github.com/da…
rkuo-danswer Sep 6, 2024
89619ce
adding clarifying comments
rkuo-danswer Sep 6, 2024
69983ab
address code review
rkuo-danswer Sep 7, 2024
138866a
missed a file
rkuo-danswer Sep 7, 2024
84ec5f4
remove commented warning ... just not needed
LostVector Sep 7, 2024
ab77692
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 9, 2024
ca6858b
Merge branch 'feature/background_processing' of https://github.com/da…
rkuo-danswer Sep 9, 2024
fe9bb06
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 9, 2024
ba58e7d
Merge branch 'feature/background_processing' of https://github.com/da…
rkuo-danswer Sep 9, 2024
8aef77a
fix imports
rkuo-danswer Sep 9, 2024
30f6463
refactor to use update_single
rkuo-danswer Sep 9, 2024
092e190
mypy fixes
rkuo-danswer Sep 9, 2024
be61371
add vespa test
rkuo-danswer Sep 10, 2024
026bca4
merge
rkuo-danswer Sep 10, 2024
5f85632
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 13, 2024
20ba19b
multiple celery workers
rkuo-danswer Sep 13, 2024
9b99c14
update logs as well and set prefetch multipliers appropriate to the w…
rkuo-danswer Sep 13, 2024
b5528e3
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
LostVector Sep 14, 2024
aca5204
add db refresh to connector deletion
LostVector Sep 14, 2024
fc3af78
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
LostVector Sep 14, 2024
a4d36b9
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
LostVector Sep 15, 2024
9ac8a10
add some preliminary locking
rkuo-danswer Sep 16, 2024
76b0996
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 16, 2024
90cc753
organize tasks into separate files
rkuo-danswer Sep 16, 2024
faaccc7
celery auto associates tasks created inside another task, which bloat…
rkuo-danswer Sep 17, 2024
c032861
code review fixes
LostVector Sep 17, 2024
e2f3363
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 17, 2024
905f104
move monitor_usergroup_taskset to ee, improve logging
rkuo-danswer Sep 17, 2024
6a8e4ef
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 17, 2024
166b405
Merge branch 'feature/background_deletion' of https://github.com/dans…
rkuo-danswer Sep 18, 2024
3a76733
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 18, 2024
bffdf94
add multi workers to dev_run_background_jobs.py
rkuo-danswer Sep 18, 2024
2f6b5f4
update supervisord with some recommended settings for celery
rkuo-danswer Sep 18, 2024
bb4ee6b
name celery workers and shorten dev script prefixing
rkuo-danswer Sep 19, 2024
4f7e5cb
add configurable sql alchemy engine settings on startup (needed for v…
rkuo-danswer Sep 19, 2024
6ebc623
fix comments
rkuo-danswer Sep 19, 2024
6433d47
merge main
rkuo-danswer Sep 20, 2024
12219b2
autoscale sqlalchemy pool size to celery concurrency (allow override …
rkuo-danswer Sep 20, 2024
e0829a9
supervisord needs the percent symbols escaped
rkuo-danswer Sep 20, 2024
84e3fbd
use name as primary check, some minor refactoring and type hinting too.
rkuo-danswer Sep 20, 2024
236b1e0
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 20, 2024
ece7d64
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 24, 2024
1a41cf8
stash merge (may not function yet)
rkuo-danswer Sep 25, 2024
427c86e
Merge branch 'main' of https://github.com/danswer-ai/danswer into fea…
rkuo-danswer Sep 27, 2024
2c61f91
cherry pick
LostVector Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""add last_pruned to the connector_credential_pair table

Revision ID: ac5eaac849f9
Revises: 52a219fb5233
Create Date: 2024-09-10 15:04:26.437118

"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "ac5eaac849f9"
down_revision = "46b7a812670f"
branch_labels = None
depends_on = None


def upgrade() -> None:
# last pruned represents the last time the connector was pruned
op.add_column(
"connector_credential_pair",
sa.Column("last_pruned", sa.DateTime(timezone=True), nullable=True),
)


def downgrade() -> None:
op.drop_column("connector_credential_pair", "last_pruned")
36 changes: 30 additions & 6 deletions backend/danswer/background/celery/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from danswer.background.celery.celery_redis import RedisConnectorCredentialPair
from danswer.background.celery.celery_redis import RedisConnectorDeletion
from danswer.background.celery.celery_redis import RedisConnectorPruning
from danswer.background.celery.celery_redis import RedisDocumentSet
from danswer.background.celery.celery_redis import RedisUserGroup
from danswer.background.celery.celery_utils import celery_is_worker_primary
Expand Down Expand Up @@ -108,6 +109,14 @@ def celery_task_postrun(
r.srem(rcd.taskset_key, task_id)
return

if task_id.startswith(RedisConnectorPruning.SUBTASK_PREFIX):
r = redis_pool.get_client()
cc_pair_id = RedisConnectorPruning.get_id_from_task_id(task_id)
if cc_pair_id is not None:
rcp = RedisConnectorPruning(cc_pair_id)
r.srem(rcp.taskset_key, task_id)
return


@beat_init.connect
def on_beat_init(sender: Any, **kwargs: Any) -> None:
Expand Down Expand Up @@ -240,6 +249,18 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
for key in r.scan_iter(RedisConnectorDeletion.FENCE_PREFIX + "*"):
r.delete(key)

for key in r.scan_iter(RedisConnectorPruning.TASKSET_PREFIX + "*"):
r.delete(key)

for key in r.scan_iter(RedisConnectorPruning.GENERATOR_COMPLETE_PREFIX + "*"):
r.delete(key)

for key in r.scan_iter(RedisConnectorPruning.GENERATOR_PROGRESS_PREFIX + "*"):
r.delete(key)

for key in r.scan_iter(RedisConnectorPruning.FENCE_PREFIX + "*"):
r.delete(key)


@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
Expand Down Expand Up @@ -334,7 +355,11 @@ def on_setup_logging(

class HubPeriodicTask(bootsteps.StartStopStep):
"""Regularly reacquires the primary worker lock outside of the task queue.
Use the task_logger in this class to avoid double logging."""
Use the task_logger in this class to avoid double logging.

This cannot be done inside a regular beat task because it must run on schedule and
a queue of existing work would starve the task from running.
"""

# it's unclear to me whether using the hub's timer or the bootstep timer is better
requires = {"celery.worker.components:Hub"}
Expand Down Expand Up @@ -368,8 +393,6 @@ def run_periodic_task(self, worker: Any) -> None:

lock: redis.lock.Lock = worker.primary_worker_lock

task_logger.info("Reacquiring primary worker lock.")

if lock.owned():
task_logger.debug("Reacquiring primary worker lock.")
lock.reacquire()
Expand Down Expand Up @@ -411,6 +434,7 @@ def stop(self, worker: Any) -> None:
"danswer.background.celery.tasks.connector_deletion",
"danswer.background.celery.tasks.periodic",
"danswer.background.celery.tasks.pruning",
"danswer.background.celery.tasks.shared",
"danswer.background.celery.tasks.vespa",
]
)
Expand All @@ -431,16 +455,16 @@ def stop(self, worker: Any) -> None:
"task": "check_for_connector_deletion_task",
# don't need to check too often, since we kick off a deletion initially
# during the API call that actually marks the CC pair for deletion
"schedule": timedelta(minutes=1),
"schedule": timedelta(seconds=60),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
}
)
celery_app.conf.beat_schedule.update(
{
"check-for-prune": {
"task": "check_for_prune_task",
"schedule": timedelta(seconds=5),
"task": "check_for_prune_task_2",
"schedule": timedelta(seconds=60),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
}
Expand Down
104 changes: 104 additions & 0 deletions backend/danswer/background/celery/celery_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,110 @@ def generate_tasks(
return len(async_results)


class RedisConnectorPruning(RedisObjectHelper):
"""Celery will kick off a long running generator task to crawl the connector and
find any missing docs, which will each then get a new cleanup task. The progress of
those tasks will then be monitored to completion.

Example rough happy path order:
Check connectorpruning_fence_1
Send generator task with id connectorpruning+generator_1_{uuid}

generator runs connector with callbacks that increment connectorpruning_generator_progress_1
generator creates many subtasks with id connectorpruning+sub_1_{uuid}
in taskset connectorpruning_taskset_1
on completion, generator sets connectorpruning_generator_complete_1

celery postrun removes subtasks from taskset
monitor beat task cleans up when taskset reaches 0 items
"""

PREFIX = "connectorpruning"
FENCE_PREFIX = PREFIX + "_fence" # a fence for the entire pruning process
GENERATOR_TASK_PREFIX = PREFIX + "+generator"

TASKSET_PREFIX = PREFIX + "_taskset" # stores a list of prune tasks id's
SUBTASK_PREFIX = PREFIX + "+sub"

GENERATOR_PROGRESS_PREFIX = (
PREFIX + "_generator_progress"
) # a signal that contains generator progress
GENERATOR_COMPLETE_PREFIX = (
PREFIX + "_generator_complete"
) # a signal that the generator has finished

def __init__(self, id: int) -> None:
super().__init__(id)
self.documents_to_prune: set[str] = set()

@property
def generator_task_id_prefix(self) -> str:
return f"{self.GENERATOR_TASK_PREFIX}_{self._id}"

@property
def generator_progress_key(self) -> str:
# example: connectorpruning_generator_progress_1
return f"{self.GENERATOR_PROGRESS_PREFIX}_{self._id}"

@property
def generator_complete_key(self) -> str:
# example: connectorpruning_generator_complete_1
return f"{self.GENERATOR_COMPLETE_PREFIX}_{self._id}"

@property
def subtask_id_prefix(self) -> str:
return f"{self.SUBTASK_PREFIX}_{self._id}"

def generate_tasks(
self,
celery_app: Celery,
db_session: Session,
redis_client: Redis,
lock: redis.lock.Lock | None,
) -> int | None:
last_lock_time = time.monotonic()

async_results = []
cc_pair = get_connector_credential_pair_from_id(self._id, db_session)
if not cc_pair:
return None

for doc_id in self.documents_to_prune:
current_time = time.monotonic()
if lock and current_time - last_lock_time >= (
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
):
lock.reacquire()
last_lock_time = current_time

# celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac"
# the actual redis key is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac"
# we prefix the task id so it's easier to keep track of who created the task
# aka "documentset_1_6dd32ded3-00aa-4884-8b21-42f8332e7fac"
custom_task_id = f"{self.subtask_id_prefix}_{uuid4()}"

# add to the tracking taskset in redis BEFORE creating the celery task.
# note that for the moment we are using a single taskset key, not differentiated by cc_pair id
redis_client.sadd(self.taskset_key, custom_task_id)

# Priority on sync's triggered by new indexing should be medium
result = celery_app.send_task(
"document_by_cc_pair_cleanup_task",
kwargs=dict(
document_id=doc_id,
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,
),
queue=DanswerCeleryQueues.CONNECTOR_DELETION,
task_id=custom_task_id,
priority=DanswerCeleryPriority.MEDIUM,
)

async_results.append(result)

return len(async_results)


def celery_get_queue_length(queue: str, r: Redis) -> int:
"""This is a redis specific way to get the length of a celery queue.
It is priority aware and knows how to count across the multiple redis lists
Expand Down
105 changes: 34 additions & 71 deletions backend/danswer/background/celery/celery_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from collections.abc import Callable
from datetime import datetime
from datetime import timezone
from typing import Any

from sqlalchemy.orm import Session

from danswer.background.celery.celery_redis import RedisConnectorDeletion
from danswer.background.task_utils import name_cc_prune_task
from danswer.configs.app_configs import ALLOW_SIMULTANEOUS_PRUNING
from danswer.background.celery.celery_redis import RedisConnectorPruning
from danswer.configs.app_configs import MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE
from danswer.connectors.cross_connector_utils.rate_limit_wrapper import (
rate_limit_builder,
Expand All @@ -17,14 +17,9 @@
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.models import Document
from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.engine import get_db_current_time
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.enums import TaskStatus
from danswer.db.models import Connector
from danswer.db.models import Credential
from danswer.db.models import TaskQueueState
from danswer.db.tasks import check_task_is_live_and_not_timed_out
from danswer.db.tasks import get_latest_task
from danswer.db.tasks import get_latest_task_by_type
from danswer.redis.redis_pool import RedisPool
from danswer.server.documents.models import DeletionAttemptSnapshot
from danswer.utils.logger import setup_logger
Expand All @@ -33,6 +28,24 @@
redis_pool = RedisPool()


# TODO: make this a member of RedisConnectorPruning
def cc_pair_is_pruning(cc_pair_id: int, db_session: Session) -> bool:
#
cc_pair = get_connector_credential_pair_from_id(
cc_pair_id=cc_pair_id, db_session=db_session
)
if not cc_pair:
raise ValueError(f"cc_pair_id {cc_pair_id} does not exist.")

rcp = RedisConnectorPruning(cc_pair.id)

r = redis_pool.get_client()
if r.exists(rcp.fence_key):
return True

return False


def _get_deletion_status(
connector_id: int, credential_id: int, db_session: Session
) -> TaskQueueState | None:
Expand Down Expand Up @@ -70,72 +83,19 @@ def get_deletion_attempt_snapshot(
)


def skip_cc_pair_pruning_by_task(
pruning_task: TaskQueueState | None, db_session: Session
) -> bool:
"""task should be the latest prune task for this cc_pair"""
if not ALLOW_SIMULTANEOUS_PRUNING:
# if only one prune is allowed at any time, then check to see if any prune
# is active
pruning_type_task_name = name_cc_prune_task()
last_pruning_type_task = get_latest_task_by_type(
pruning_type_task_name, db_session
)

if last_pruning_type_task and check_task_is_live_and_not_timed_out(
last_pruning_type_task, db_session
):
return True

if pruning_task and check_task_is_live_and_not_timed_out(pruning_task, db_session):
# if the last task is live right now, we shouldn't start a new one
return True

return False


def should_prune_cc_pair(
connector: Connector, credential: Credential, db_session: Session
) -> bool:
if not connector.prune_freq:
return False

pruning_task_name = name_cc_prune_task(
connector_id=connector.id, credential_id=credential.id
)
last_pruning_task = get_latest_task(pruning_task_name, db_session)

if skip_cc_pair_pruning_by_task(last_pruning_task, db_session):
return False

current_db_time = get_db_current_time(db_session)

if not last_pruning_task:
# If the connector has never been pruned, then compare vs when the connector
# was created
time_since_initialization = current_db_time - connector.time_created
if time_since_initialization.total_seconds() >= connector.prune_freq:
return True
return False

if not last_pruning_task.start_time:
# if the last prune task hasn't started, we shouldn't start a new one
return False

# if the last prune task has a start time, then compare against it to determine
# if we should start
time_since_last_pruning = current_db_time - last_pruning_task.start_time
return time_since_last_pruning.total_seconds() >= connector.prune_freq


def document_batch_to_ids(doc_batch: list[Document]) -> set[str]:
return {doc.id for doc in doc_batch}


def extract_ids_from_runnable_connector(runnable_connector: BaseConnector) -> set[str]:
def extract_ids_from_runnable_connector(
runnable_connector: BaseConnector,
progress_callback: Callable[[int], None] | None = None,
) -> set[str]:
"""
If the PruneConnector hasnt been implemented for the given connector, just pull
all docs using the load_from_state and grab out the IDs
all docs using the load_from_state and grab out the IDs.

Optionally, a callback can be passed to handle the length of each document batch.
"""
all_connector_doc_ids: set[str] = set()

Expand All @@ -158,6 +118,8 @@ def extract_ids_from_runnable_connector(runnable_connector: BaseConnector) -> se
max_calls=MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE, period=60
)(document_batch_to_ids)
for doc_batch in doc_batch_generator:
if progress_callback:
progress_callback(len(doc_batch))
all_connector_doc_ids.update(doc_batch_processing_func(doc_batch))

return all_connector_doc_ids
Expand All @@ -177,9 +139,10 @@ def celery_is_listening_to_queue(worker: Any, name: str) -> bool:


def celery_is_worker_primary(worker: Any) -> bool:
"""There are multiple approaches that could be taken, but the way we do it is to
check the hostname set for the celery worker, either in celeryconfig.py or on the
command line."""
"""There are multiple approaches that could be taken to determine if a celery worker
is 'primary', as defined by us. But the way we do it is to check the hostname set
for the celery worker, which can be done either in celeryconfig.py or on the
command line with '--hostname'."""
hostname = worker.hostname
if hostname.startswith("light"):
return False
Expand Down
Loading
Loading