From 3b9cbdbcbd8fc6f70499a956db61b4e2ad0b044f Mon Sep 17 00:00:00 2001 From: Ryan Bales Date: Fri, 10 Jan 2025 11:56:33 -0500 Subject: [PATCH 1/7] ignoring unchanged updates in entity sync --- entity/sync.py | 232 +++++++++++++++++++++++++--------- entity/tests/sync_tests.py | 7 +- entity/version.py | 2 +- release_notes.md | 2 + requirements/requirements.txt | 1 + 5 files changed, 179 insertions(+), 65 deletions(-) diff --git a/entity/sync.py b/entity/sync.py index 6326977..8255bc0 100644 --- a/entity/sync.py +++ b/entity/sync.py @@ -2,25 +2,42 @@ Provides functions for syncing entities and their relationships to the Entity and EntityRelationship tables. """ -import logging -from time import sleep +from __future__ import annotations -import wrapt +import logging from collections import defaultdict +from time import sleep, time +from typing import TYPE_CHECKING +import pgbulk +from uuid import uuid4 +import wrapt from activatable_model import model_activations_changed from django import db from django.contrib.contenttypes.models import ContentType -import manager_utils -from django.db import transaction, connection - +from django.db import connection, transaction from entity.config import entity_registry -from entity.models import Entity, EntityRelationship, EntityKind +from entity.models import Entity, EntityKind, EntityRelationship +if TYPE_CHECKING: + from django.db.models import QuerySet LOG = logging.getLogger(__name__) +from contextlib import contextmanager + +@contextmanager +def time_spend(message): + start_time = time() + try: + yield + finally: + end_time = time() + time_spent = end_time - start_time + print(f'{message}: {time_spent}') + + def transaction_atomic_with_retry(num_retries=5, backoff=0.1): """ This is a decorator that will wrap the decorated method in an atomic transaction and @@ -249,7 +266,7 @@ def sync_entities_watching(instance): sync_entities(*model_objs) -class EntitySyncer(object): +class EntitySyncer: """ A class that will handle the syncing of entities """ @@ -265,13 +282,12 @@ def __init__(self, *model_objs): # Are we syncing all self.sync_all = not model_objs + @time_spend('sync()') def sync(self): # Log what we are syncing LOG.debug('sync_entities') LOG.debug(self.model_objs) - # Determine if we are syncing all - sync_all = not self.model_objs model_objs_map = { (ContentType.objects.get_for_model(model_obj, for_concrete_model=False), model_obj.id): model_obj for model_obj in self.model_objs @@ -301,12 +317,17 @@ def sync(self): # For each ctype, obtain super entities. This is a dict keyed on ctype. Each value # is a dict keyed on the ctype of the super entity with a list of tuples for # IDs of sub/super entity relationships - super_entities_by_ctype = _get_super_entities_by_ctype(model_objs_by_ctype, model_ids_to_sync, sync_all) + super_entities_by_ctype = _get_super_entities_by_ctype(model_objs_by_ctype, model_ids_to_sync, self.sync_all) # Now that we have all models we need to sync, fetch them so that we can extract # metadata and entity kinds. If we are syncing all entities, we've already fetched # everything and can fill in this data struct without doing another DB hit - model_objs_to_sync = _get_model_objs_to_sync(model_ids_to_sync, model_objs_map, model_objs_by_ctype, sync_all) + model_objs_to_sync = _get_model_objs_to_sync( + model_ids_to_sync, + model_objs_map, + model_objs_by_ctype, + self.sync_all, + ) # Obtain all entity kind tuples associated with the models entity_kind_tuples_to_sync = set() @@ -384,19 +405,13 @@ def sync(self): if (ctype.id, model_obj.id) in entities_map ] - if self.sync_all: - # If we're syncing everything, just sync against the entire entity relationship - # table instead of doing a complex __in query - sync_against = EntityRelationship.objects.all() - else: - sync_against = EntityRelationship.objects.filter(sub_entity_id__in=original_entity_ids) - # Sync the relations self.upsert_entity_relationships( - queryset=sync_against, + original_entity_ids, entity_relationships=entity_relationships_to_sync ) + @time_spend('upsert_entity_kinds()') @transaction_atomic_with_retry() def upsert_entity_kinds(self, entity_kinds): """ @@ -406,7 +421,6 @@ def upsert_entity_kinds(self, entity_kinds): :param entity_kinds: The list of entity kinds to sync """ - # Filter out unchanged entity kinds unchanged_entity_kinds = {} if entity_kinds: @@ -437,19 +451,18 @@ def upsert_entity_kinds(self, entity_kinds): list(EntityKind.all_objects.all().order_by('id').select_for_update().values_list('id', flat=True)) # Upsert the entity kinds - upserted_enitity_kinds = manager_utils.bulk_upsert( - queryset=EntityKind.all_objects.filter( - name__in=[entity_kind.name for entity_kind in changed_entity_kinds] - ), - model_objs=changed_entity_kinds, - unique_fields=['name'], - update_fields=['display_name'], - return_upserts=True + upserted_enitity_kinds = pgbulk.upsert( + EntityKind, + changed_entity_kinds, + ['name'], + ['display_name'], + returning=True, ) # Return all the entity kinds return upserted_enitity_kinds + list(unchanged_entity_kinds.values()) + @time_spend('upsert_entities()') @transaction_atomic_with_retry() def upsert_entities(self, entities, sync=False): """ @@ -457,7 +470,6 @@ def upsert_entities(self, entities, sync=False): :param entities: The entities to sync :param sync: Do a sync instead of an upsert """ - # Select the entities we are upserting for update to reduce deadlocks if entities: # Default select for update query when syncing all @@ -504,25 +516,67 @@ def upsert_entities(self, entities, sync=False): for entity in initial_queryset.values_list('id', 'is_active') } - # Sync all the entities if the sync flag is passed - if sync: - upserted_entities = manager_utils.sync( - queryset=initial_queryset, - model_objs=entities, - unique_fields=['entity_type_id', 'entity_id'], - update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'], - return_upserts=True - ) - # Otherwise we want to upsert our entities - else: - upserted_entities = manager_utils.bulk_upsert( + with time_spend('upsert entities'): + # Upsert entities + pgbulk.upsert( queryset=initial_queryset, model_objs=entities, unique_fields=['entity_type_id', 'entity_id'], update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'], - return_upserts=True + ignore_unchanged=True, ) + upserted_entities = [] + # Delete unreferenced entities if sync=True + if entities: + with connection.cursor() as cursor: + uuid = uuid4() + table_name = f'sync_entities_{uuid}'.replace('-', '_') + with time_spend('entities - create temp table'): + cursor.execute( + f'CREATE TEMPORARY TABLE {table_name} (' + 'entity_type_id INTEGER, ' + 'entity_id INTEGER, ' + 'PRIMARY KEY(entity_type_id, entity_id))' + ) + values = [] + for entity in entities: + values.extend([entity.entity_type_id, entity.entity_id]) + values_escaped = ','.join([ + '(%s, %s)' for i in range(0, int(len(values) / 2)) + ]) + with time_spend('entities - insert into temp table'): + cursor.execute( + f'INSERT INTO {table_name} (entity_type_id, entity_id) VALUES {values_escaped}', + values + ) + + # Fetch upserted entities + with time_spend('fetching upserted entities'): + cursor.execute( + f'SELECT t1.id FROM entity_entity t1 ' + f'JOIN {table_name} t2 ON t1.entity_type_id=t2.entity_type_id AND t1.entity_id=t2.entity_id' + ) + upserted_entities = Entity.all_objects.filter( + id__in=[entity[0] for entity in cursor.fetchall()] + ) + + if sync: + sync_cleanup_query = ( + f'DELETE FROM entity_entity WHERE id IN (' + f'SELECT id FROM (' + f'WITH initial AS ({initial_queryset.query}), ' + f'syncd AS (SELECT entity_type_id, entity_id FROM {table_name}) ' + f'SELECT initial.id, initial.entity_id FROM initial ' + f'LEFT OUTER JOIN syncd ON initial.entity_type_id=syncd.entity_type_id AND ' + f'initial.entity_id=syncd.entity_id ' + f'WHERE syncd.entity_id IS NULL' + f') as ids' + f')' + ) + with time_spend('entities - delete records'): + cursor.execute(sync_cleanup_query) + # Compute the current state of the entities current_entity_activation_state = { entity.id: entity.is_active @@ -549,29 +603,87 @@ def upsert_entities(self, entities, sync=False): # Return the upserted entities return upserted_entities, changed_entity_activation_state + @time_spend('upsert_entity_relationships()') @transaction_atomic_with_retry() - def upsert_entity_relationships(self, queryset, entity_relationships): + def upsert_entity_relationships(self, original_entity_ids, entity_relationships): """ - Upsert entity relationships to the database - :param queryset: The base queryset to use + Sync entity relationships to the database + + :param queryset: The ids of the original entity objects that are being sync'd :param entity_relationships: The entity relationships to ensure exist in the database """ + initial_queryset = self._get_entity_relationships_to_sync(original_entity_ids) + + # Upsert the relationships + with time_spend('entity relationships - upsert'): + pgbulk.upsert( + initial_queryset, + entity_relationships, + ['sub_entity_id', 'super_entity_id'], + ignore_unchanged=True, + ) - # Select the relationships for update if entity_relationships: - list(queryset.order_by('id').select_for_update().values_list( - 'id', - flat=True - )) - - # Sync the relationships - return manager_utils.sync( - queryset=queryset, - model_objs=entity_relationships, - unique_fields=['sub_entity_id', 'super_entity_id'], - update_fields=[], - return_upserts=True - ) + # Get the new and updated relationships that were upserted + uuid = uuid4() + table_name = f'sync_entity_relationships_{uuid}'.replace('-', '_') + with connection.cursor() as cursor: + with time_spend('entity relationships - create temp table'): + cursor.execute( + f'CREATE TEMPORARY TABLE {table_name} (' + 'sub_entity_id INTEGER, ' + 'super_entity_id INTEGER, ' + 'PRIMARY KEY(sub_entity_id, super_entity_id))' + ) + values = [] + for relationship in entity_relationships: + values.extend([relationship.sub_entity_id, relationship.super_entity_id]) + values_escaped = ','.join([ + '(%s, %s)' for i in range(0, int(len(values) / 2)) + ]) + with time_spend('entity relationships - insert into temp table'): + cursor.execute( + f'INSERT INTO {table_name} (sub_entity_id, super_entity_id) VALUES {values_escaped}', + values + ) + + # If we upserted relationships, we need to delete relationships from the initial set that weren't just upserted + # We'll use the temp table as a reference for what was upserted + if entity_relationships: + with connection.cursor() as cursor: + query = ( + f'DELETE FROM entity_entityrelationship WHERE id IN (' + f'SELECT id FROM (' + f'WITH initial AS ({initial_queryset.query}), ' + f'syncd AS (SELECT sub_entity_id, super_entity_id FROM {table_name}) ' + f'SELECT initial.id, initial.super_entity_id FROM initial ' + f'LEFT OUTER JOIN syncd ON initial.sub_entity_id=syncd.sub_entity_id AND ' + f'initial.super_entity_id=syncd.super_entity_id ' + f'WHERE syncd.super_entity_id IS NULL' + f') as ids' + f')' + ) + with time_spend('entity relationships - delete records'): + cursor.execute(query) + # Else, just delete everything from the initial queryset, since we didn't upsert anything + else: + initial_queryset.delete() + + def _get_entity_relationships_to_sync(self, original_entity_ids) -> QuerySet: + """ + Given the calling context and the ids of the entities this process was originally syncing, + return the entity relationship queryset that should be replaced/sync'd with an updated set of relationships + + :param original_entity_ids: The list of the entities originally meant to be sync'd by this process + """ + queryset = EntityRelationship.objects.filter(sub_entity_id__in=original_entity_ids) + + if self.sync_all: + # If we're syncing everything, just sync against the entire entity relationship + # table instead of doing a complex __in query + queryset = EntityRelationship.objects.all() + + return queryset def send_entity_activation_events(self, changed_entity_activation_state): """ diff --git a/entity/tests/sync_tests.py b/entity/tests/sync_tests.py index 7435b7b..fbece84 100644 --- a/entity/tests/sync_tests.py +++ b/entity/tests/sync_tests.py @@ -381,7 +381,6 @@ def test_sync_all(self, mock_model_activations_changed): """ Tests that when we sync all we fire the correct signals """ - # Create five test accounts turn_off_syncing() initial_accounts = [] @@ -957,7 +956,7 @@ def test_optimal_queries_registered_entity_with_no_qset(self): team_group = G(TeamGroup) ContentType.objects.clear_cache() - with self.assertNumQueries(15): + with self.assertNumQueries(18): team_group.save() def test_optimal_queries_registered_entity_w_qset(self): @@ -967,7 +966,7 @@ def test_optimal_queries_registered_entity_w_qset(self): account = G(Account) ContentType.objects.clear_cache() - with self.assertNumQueries(18): + with self.assertNumQueries(21): account.save() def test_sync_all_optimal_queries(self): @@ -996,7 +995,7 @@ def test_sync_all_optimal_queries(self): with patch('entity.sync.entity_registry') as mock_entity_registry: mock_entity_registry.entity_registry = new_registry.entity_registry ContentType.objects.clear_cache() - with self.assertNumQueries(20): + with self.assertNumQueries(26): sync_entities() self.assertEqual(Entity.objects.filter(entity_type=ContentType.objects.get_for_model(Account)).count(), 5) diff --git a/entity/version.py b/entity/version.py index 3fec2bf..89d98bf 100644 --- a/entity/version.py +++ b/entity/version.py @@ -1 +1 @@ -__version__ = '6.2.3' +__version__ = '6.3.0' diff --git a/release_notes.md b/release_notes.md index 400493c..eea05c9 100644 --- a/release_notes.md +++ b/release_notes.md @@ -1,5 +1,7 @@ ## Release Notes +- 6.3.0: + - Ignore no-op updates when syncing entities and entity relationships in order to avoid downstream cost - 6.2.3: - Update the `defer_entity_syncing` decorator to support an optional handler. - 6.2.2: diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 3f16bd4..68d1547 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,5 +1,6 @@ Django>=3.2 django-activatable-model>=3.1.0 +django-pgbulk==3.2.0 django-manager-utils>=3.1.0 python3-utils>=0.3 wrapt>=1.10.5 From 13792bd59fd316bbf116c190f0364a4a63a24c06 Mon Sep 17 00:00:00 2001 From: Ryan Bales Date: Fri, 10 Jan 2025 12:02:51 -0500 Subject: [PATCH 2/7] cleanup --- entity/sync.py | 116 +++++++++++++++++++------------------------------ 1 file changed, 45 insertions(+), 71 deletions(-) diff --git a/entity/sync.py b/entity/sync.py index 8255bc0..5ccdd9c 100644 --- a/entity/sync.py +++ b/entity/sync.py @@ -6,11 +6,11 @@ import logging from collections import defaultdict -from time import sleep, time +from time import sleep from typing import TYPE_CHECKING +from uuid import uuid4 import pgbulk -from uuid import uuid4 import wrapt from activatable_model import model_activations_changed from django import db @@ -25,19 +25,6 @@ LOG = logging.getLogger(__name__) -from contextlib import contextmanager - -@contextmanager -def time_spend(message): - start_time = time() - try: - yield - finally: - end_time = time() - time_spent = end_time - start_time - print(f'{message}: {time_spent}') - - def transaction_atomic_with_retry(num_retries=5, backoff=0.1): """ This is a decorator that will wrap the decorated method in an atomic transaction and @@ -282,7 +269,6 @@ def __init__(self, *model_objs): # Are we syncing all self.sync_all = not model_objs - @time_spend('sync()') def sync(self): # Log what we are syncing LOG.debug('sync_entities') @@ -411,7 +397,6 @@ def sync(self): entity_relationships=entity_relationships_to_sync ) - @time_spend('upsert_entity_kinds()') @transaction_atomic_with_retry() def upsert_entity_kinds(self, entity_kinds): """ @@ -462,7 +447,6 @@ def upsert_entity_kinds(self, entity_kinds): # Return all the entity kinds return upserted_enitity_kinds + list(unchanged_entity_kinds.values()) - @time_spend('upsert_entities()') @transaction_atomic_with_retry() def upsert_entities(self, entities, sync=False): """ @@ -516,15 +500,14 @@ def upsert_entities(self, entities, sync=False): for entity in initial_queryset.values_list('id', 'is_active') } - with time_spend('upsert entities'): - # Upsert entities - pgbulk.upsert( - queryset=initial_queryset, - model_objs=entities, - unique_fields=['entity_type_id', 'entity_id'], - update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'], - ignore_unchanged=True, - ) + # Upsert entities + pgbulk.upsert( + queryset=initial_queryset, + model_objs=entities, + unique_fields=['entity_type_id', 'entity_id'], + update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'], + ignore_unchanged=True, + ) upserted_entities = [] # Delete unreferenced entities if sync=True @@ -532,34 +515,31 @@ def upsert_entities(self, entities, sync=False): with connection.cursor() as cursor: uuid = uuid4() table_name = f'sync_entities_{uuid}'.replace('-', '_') - with time_spend('entities - create temp table'): - cursor.execute( - f'CREATE TEMPORARY TABLE {table_name} (' - 'entity_type_id INTEGER, ' - 'entity_id INTEGER, ' - 'PRIMARY KEY(entity_type_id, entity_id))' - ) + cursor.execute( + f'CREATE TEMPORARY TABLE {table_name} (' + 'entity_type_id INTEGER, ' + 'entity_id INTEGER, ' + 'PRIMARY KEY(entity_type_id, entity_id))' + ) values = [] for entity in entities: values.extend([entity.entity_type_id, entity.entity_id]) values_escaped = ','.join([ '(%s, %s)' for i in range(0, int(len(values) / 2)) ]) - with time_spend('entities - insert into temp table'): - cursor.execute( - f'INSERT INTO {table_name} (entity_type_id, entity_id) VALUES {values_escaped}', - values - ) + cursor.execute( + f'INSERT INTO {table_name} (entity_type_id, entity_id) VALUES {values_escaped}', + values + ) # Fetch upserted entities - with time_spend('fetching upserted entities'): - cursor.execute( - f'SELECT t1.id FROM entity_entity t1 ' - f'JOIN {table_name} t2 ON t1.entity_type_id=t2.entity_type_id AND t1.entity_id=t2.entity_id' - ) - upserted_entities = Entity.all_objects.filter( - id__in=[entity[0] for entity in cursor.fetchall()] - ) + cursor.execute( + f'SELECT t1.id FROM entity_entity t1 ' + f'JOIN {table_name} t2 ON t1.entity_type_id=t2.entity_type_id AND t1.entity_id=t2.entity_id' + ) + upserted_entities = Entity.all_objects.filter( + id__in=[entity[0] for entity in cursor.fetchall()] + ) if sync: sync_cleanup_query = ( @@ -574,8 +554,7 @@ def upsert_entities(self, entities, sync=False): f') as ids' f')' ) - with time_spend('entities - delete records'): - cursor.execute(sync_cleanup_query) + cursor.execute(sync_cleanup_query) # Compute the current state of the entities current_entity_activation_state = { @@ -603,7 +582,6 @@ def upsert_entities(self, entities, sync=False): # Return the upserted entities return upserted_entities, changed_entity_activation_state - @time_spend('upsert_entity_relationships()') @transaction_atomic_with_retry() def upsert_entity_relationships(self, original_entity_ids, entity_relationships): """ @@ -615,37 +593,34 @@ def upsert_entity_relationships(self, original_entity_ids, entity_relationships) initial_queryset = self._get_entity_relationships_to_sync(original_entity_ids) # Upsert the relationships - with time_spend('entity relationships - upsert'): - pgbulk.upsert( - initial_queryset, - entity_relationships, - ['sub_entity_id', 'super_entity_id'], - ignore_unchanged=True, - ) + pgbulk.upsert( + initial_queryset, + entity_relationships, + ['sub_entity_id', 'super_entity_id'], + ignore_unchanged=True, + ) if entity_relationships: # Get the new and updated relationships that were upserted uuid = uuid4() table_name = f'sync_entity_relationships_{uuid}'.replace('-', '_') with connection.cursor() as cursor: - with time_spend('entity relationships - create temp table'): - cursor.execute( - f'CREATE TEMPORARY TABLE {table_name} (' - 'sub_entity_id INTEGER, ' - 'super_entity_id INTEGER, ' - 'PRIMARY KEY(sub_entity_id, super_entity_id))' - ) + cursor.execute( + f'CREATE TEMPORARY TABLE {table_name} (' + 'sub_entity_id INTEGER, ' + 'super_entity_id INTEGER, ' + 'PRIMARY KEY(sub_entity_id, super_entity_id))' + ) values = [] for relationship in entity_relationships: values.extend([relationship.sub_entity_id, relationship.super_entity_id]) values_escaped = ','.join([ '(%s, %s)' for i in range(0, int(len(values) / 2)) ]) - with time_spend('entity relationships - insert into temp table'): - cursor.execute( - f'INSERT INTO {table_name} (sub_entity_id, super_entity_id) VALUES {values_escaped}', - values - ) + cursor.execute( + f'INSERT INTO {table_name} (sub_entity_id, super_entity_id) VALUES {values_escaped}', + values + ) # If we upserted relationships, we need to delete relationships from the initial set that weren't just upserted # We'll use the temp table as a reference for what was upserted @@ -663,8 +638,7 @@ def upsert_entity_relationships(self, original_entity_ids, entity_relationships) f') as ids' f')' ) - with time_spend('entity relationships - delete records'): - cursor.execute(query) + cursor.execute(query) # Else, just delete everything from the initial queryset, since we didn't upsert anything else: initial_queryset.delete() From 17c1b479035b544dc933252c3eac43fa38faa9a8 Mon Sep 17 00:00:00 2001 From: Wes Kendall Date: Fri, 10 Jan 2025 11:54:10 -0600 Subject: [PATCH 3/7] Refactor temp table --- entity/sync.py | 299 +++++++++++++++++++++++++++---------------------- 1 file changed, 167 insertions(+), 132 deletions(-) diff --git a/entity/sync.py b/entity/sync.py index 5ccdd9c..b31050f 100644 --- a/entity/sync.py +++ b/entity/sync.py @@ -2,6 +2,7 @@ Provides functions for syncing entities and their relationships to the Entity and EntityRelationship tables. """ + from __future__ import annotations import logging @@ -142,9 +143,13 @@ def _get_super_entities_by_ctype(model_objs_by_ctype, model_ids_to_sync, sync_al super_entities_by_ctype = defaultdict(lambda: defaultdict(list)) # pragma: no cover for ctype, model_objs_for_ctype in model_objs_by_ctype.items(): entity_config = entity_registry.entity_registry.get(ctype.model_class()) - super_entities = entity_config.get_super_entities(model_objs_for_ctype, sync_all) + super_entities = entity_config.get_super_entities( + model_objs_for_ctype, sync_all + ) super_entities_by_ctype[ctype] = { - ContentType.objects.get_for_model(model_class, for_concrete_model=False): relationships + ContentType.objects.get_for_model( + model_class, for_concrete_model=False + ): relationships for model_class, relationships in super_entities.items() } @@ -157,21 +162,18 @@ def _get_super_entities_by_ctype(model_objs_by_ctype, model_ids_to_sync, sync_al return super_entities_by_ctype -def _fetch_entity_models(model_ids_to_sync, model_objs_map, model_objs_by_ctype, sync_all): +def _fetch_entity_models( + model_ids_to_sync, model_objs_map, model_objs_by_ctype, sync_all +): """ Fetch the entity models per content type. This will also handle the case where accounts are created before _get_super_entities_by_ctype and the model_ids_to_sync do not match the model_objs_map """ for ctype, model_ids in model_ids_to_sync.items(): - if sync_all: - # Build a set of ids of already fetched models - fetched_model_ids = { - model.id - for model in model_objs_by_ctype[ctype] - } + fetched_model_ids = {model.id for model in model_objs_by_ctype[ctype]} # Compute the set diff to see if any records are missing unfetched_model_ids = model_ids - fetched_model_ids @@ -180,27 +182,31 @@ def _fetch_entity_models(model_ids_to_sync, model_objs_map, model_objs_by_ctype, # Check if new records if unfetched_model_ids: - # Fetch the records and add them to the model_objs_map - model_qset = entity_registry.entity_registry.get(ctype.model_class()).queryset + model_qset = entity_registry.entity_registry.get( + ctype.model_class() + ).queryset model_objs_to_sync = model_qset.filter(id__in=unfetched_model_ids) for model_obj in model_objs_to_sync: model_objs_by_ctype[ctype].append(model_obj) model_objs_map[(ctype, model_obj.id)] = model_obj -def _get_model_objs_to_sync(model_ids_to_sync, model_objs_map, model_objs_by_ctype, sync_all): +def _get_model_objs_to_sync( + model_ids_to_sync, model_objs_map, model_objs_by_ctype, sync_all +): """ Given the model IDs to sync, fetch all model objects to sync """ model_objs_to_sync = {} - _fetch_entity_models(model_ids_to_sync, model_objs_map, model_objs_by_ctype, sync_all) + _fetch_entity_models( + model_ids_to_sync, model_objs_map, model_objs_by_ctype, sync_all + ) for ctype, model_ids_to_sync_for_ctype in model_ids_to_sync.items(): model_objs_to_sync[ctype] = [ - model_objs_map[ctype, model_id] - for model_id in model_ids_to_sync_for_ctype + model_objs_map[ctype, model_id] for model_id in model_ids_to_sync_for_ctype ] return model_objs_to_sync @@ -247,7 +253,9 @@ def sync_entities_watching(instance): """ Syncs entities watching changes of a model instance. """ - for entity_model, entity_model_getter in entity_registry.entity_watching[instance.__class__]: + for entity_model, entity_model_getter in entity_registry.entity_watching[ + instance.__class__ + ]: model_objs = list(entity_model_getter(instance)) if model_objs: sync_entities(*model_objs) @@ -271,11 +279,14 @@ def __init__(self, *model_objs): def sync(self): # Log what we are syncing - LOG.debug('sync_entities') + LOG.debug("sync_entities") LOG.debug(self.model_objs) model_objs_map = { - (ContentType.objects.get_for_model(model_obj, for_concrete_model=False), model_obj.id): model_obj + ( + ContentType.objects.get_for_model(model_obj, for_concrete_model=False), + model_obj.id, + ): model_obj for model_obj in self.model_objs } @@ -283,10 +294,17 @@ def sync(self): if self.sync_all: for model_class, entity_config in entity_registry.entity_registry.items(): model_qset = entity_config.queryset - model_objs_map.update({ - (ContentType.objects.get_for_model(model_class, for_concrete_model=False), model_obj.id): model_obj - for model_obj in model_qset.all() - }) + model_objs_map.update( + { + ( + ContentType.objects.get_for_model( + model_class, for_concrete_model=False + ), + model_obj.id, + ): model_obj + for model_obj in model_qset.all() + } + ) # Organize by content type model_objs_by_ctype = defaultdict(list) @@ -303,7 +321,9 @@ def sync(self): # For each ctype, obtain super entities. This is a dict keyed on ctype. Each value # is a dict keyed on the ctype of the super entity with a list of tuples for # IDs of sub/super entity relationships - super_entities_by_ctype = _get_super_entities_by_ctype(model_objs_by_ctype, model_ids_to_sync, self.sync_all) + super_entities_by_ctype = _get_super_entities_by_ctype( + model_objs_by_ctype, model_ids_to_sync, self.sync_all + ) # Now that we have all models we need to sync, fetch them so that we can extract # metadata and entity kinds. If we are syncing all entities, we've already fetched @@ -335,30 +355,32 @@ def sync(self): # Build a map of entity kind name to entity kind entity_kinds_map = { - entity_kind.name: entity_kind - for entity_kind in upserted_entity_kinds + entity_kind.name: entity_kind for entity_kind in upserted_entity_kinds } # Now that we have all entity kinds, build all entities that need to be synced entities_to_upsert = [] for ctype, model_objs_to_sync_for_ctype in model_objs_to_sync.items(): entity_config = entity_registry.entity_registry.get(ctype.model_class()) - entities_to_upsert.extend([ - Entity( - entity_id=model_obj.id, - entity_type_id=ctype.id, - entity_kind_id=entity_kinds_map[entity_config.get_entity_kind(model_obj)[0]].id, - entity_meta=entity_config.get_entity_meta(model_obj), - display_name=entity_config.get_display_name(model_obj), - is_active=entity_config.get_is_active(model_obj) - ) - for model_obj in model_objs_to_sync_for_ctype - ]) + entities_to_upsert.extend( + [ + Entity( + entity_id=model_obj.id, + entity_type_id=ctype.id, + entity_kind_id=entity_kinds_map[ + entity_config.get_entity_kind(model_obj)[0] + ].id, + entity_meta=entity_config.get_entity_meta(model_obj), + display_name=entity_config.get_display_name(model_obj), + is_active=entity_config.get_is_active(model_obj), + ) + for model_obj in model_objs_to_sync_for_ctype + ] + ) # Upsert the entities and get the upserted entities and the changed state upserted_entities, changed_entity_activation_state = self.upsert_entities( - entities=entities_to_upsert, - sync=self.sync_all + entities=entities_to_upsert, sync=self.sync_all ) # Call the model activations changed signal manually since we have done a bulk operation @@ -379,7 +401,8 @@ def sync(self): for sub_ctype, super_entities_by_sub_ctype in super_entities_by_ctype.items() for super_ctype, relationships in super_entities_by_sub_ctype.items() for sub_entity_id, super_entity_id in relationships - if (sub_ctype.id, sub_entity_id) in entities_map and (super_ctype.id, super_entity_id) in entities_map + if (sub_ctype.id, sub_entity_id) in entities_map + and (super_ctype.id, super_entity_id) in entities_map ] # Find the entities of the original model objects we were syncing. These @@ -393,8 +416,7 @@ def sync(self): # Sync the relations self.upsert_entity_relationships( - original_entity_ids, - entity_relationships=entity_relationships_to_sync + original_entity_ids, entity_relationships=entity_relationships_to_sync ) @transaction_atomic_with_retry() @@ -412,11 +434,13 @@ def upsert_entity_kinds(self, entity_kinds): unchanged_entity_kinds = { (entity_kind.name, entity_kind.display_name): entity_kind for entity_kind in EntityKind.all_objects.extra( - where=['(name, display_name) IN %s'], - params=[tuple( - (entity_kind.name, entity_kind.display_name) - for entity_kind in entity_kinds - )] + where=["(name, display_name) IN %s"], + params=[ + tuple( + (entity_kind.name, entity_kind.display_name) + for entity_kind in entity_kinds + ) + ], ) } @@ -424,7 +448,8 @@ def upsert_entity_kinds(self, entity_kinds): changed_entity_kinds = [ entity_kind for entity_kind in entity_kinds - if (entity_kind.name, entity_kind.display_name) not in unchanged_entity_kinds + if (entity_kind.name, entity_kind.display_name) + not in unchanged_entity_kinds ] # If any of our kinds have changed upsert them @@ -433,14 +458,19 @@ def upsert_entity_kinds(self, entity_kinds): # Select all our existing entity kinds for update so we can do proper locking # We have to select all here for some odd reason, if we only select the ones # we are syncing we still run into deadlock issues - list(EntityKind.all_objects.all().order_by('id').select_for_update().values_list('id', flat=True)) + list( + EntityKind.all_objects.all() + .order_by("id") + .select_for_update() + .values_list("id", flat=True) + ) # Upsert the entity kinds upserted_enitity_kinds = pgbulk.upsert( EntityKind, changed_entity_kinds, - ['name'], - ['display_name'], + ["name"], + ["display_name"], returning=True, ) @@ -458,26 +488,23 @@ def upsert_entities(self, entities, sync=False): if entities: # Default select for update query when syncing all select_for_update_query = ( - 'SELECT FROM {table_name} ORDER BY id ASC FOR NO KEY UPDATE' - ).format( - table_name=Entity._meta.db_table - ) + "SELECT FROM {table_name} ORDER BY id ASC FOR NO KEY UPDATE" + ).format(table_name=Entity._meta.db_table) select_for_update_query_params = [] # If we are not syncing all, only select those we are updating if not sync: select_for_update_query = ( - 'SELECT FROM {table_name} ' - 'WHERE (entity_type_id, entity_id) IN %s ' - 'ORDER BY id ASC ' - 'FOR NO KEY UPDATE' - ).format( - table_name=Entity._meta.db_table - ) - select_for_update_query_params = [tuple( - (entity.entity_type_id, entity.entity_id) - for entity in entities - )] + "SELECT FROM {table_name} " + "WHERE (entity_type_id, entity_id) IN %s " + "ORDER BY id ASC " + "FOR NO KEY UPDATE" + ).format(table_name=Entity._meta.db_table) + select_for_update_query_params = [ + tuple( + (entity.entity_type_id, entity.entity_id) for entity in entities + ) + ] # Select the items for update with connection.cursor() as cursor: @@ -489,23 +516,29 @@ def upsert_entities(self, entities, sync=False): initial_queryset = Entity.all_objects.all() if not sync: initial_queryset = Entity.all_objects.extra( - where=['(entity_type_id, entity_id) IN %s'], - params=[tuple( - (entity.entity_type_id, entity.entity_id) - for entity in entities - )] + where=["(entity_type_id, entity_id) IN %s"], + params=[ + tuple( + (entity.entity_type_id, entity.entity_id) for entity in entities + ) + ], ) initial_entity_activation_state = { entity[0]: entity[1] - for entity in initial_queryset.values_list('id', 'is_active') + for entity in initial_queryset.values_list("id", "is_active") } # Upsert entities pgbulk.upsert( queryset=initial_queryset, model_objs=entities, - unique_fields=['entity_type_id', 'entity_id'], - update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'], + unique_fields=["entity_type_id", "entity_id"], + update_fields=[ + "entity_kind_id", + "entity_meta", + "display_name", + "is_active", + ], ignore_unchanged=True, ) @@ -513,67 +546,65 @@ def upsert_entities(self, entities, sync=False): # Delete unreferenced entities if sync=True if entities: with connection.cursor() as cursor: - uuid = uuid4() - table_name = f'sync_entities_{uuid}'.replace('-', '_') - cursor.execute( - f'CREATE TEMPORARY TABLE {table_name} (' - 'entity_type_id INTEGER, ' - 'entity_id INTEGER, ' - 'PRIMARY KEY(entity_type_id, entity_id))' - ) + # Create the values string for the VALUES list values = [] + values_list_items = [] for entity in entities: values.extend([entity.entity_type_id, entity.entity_id]) - values_escaped = ','.join([ - '(%s, %s)' for i in range(0, int(len(values) / 2)) - ]) - cursor.execute( - f'INSERT INTO {table_name} (entity_type_id, entity_id) VALUES {values_escaped}', - values - ) + values_list_items.append("(%s, %s)") + values_list = ",".join(values_list_items) - # Fetch upserted entities + # Fetch upserted entities using direct VALUES list cursor.execute( - f'SELECT t1.id FROM entity_entity t1 ' - f'JOIN {table_name} t2 ON t1.entity_type_id=t2.entity_type_id AND t1.entity_id=t2.entity_id' + f"SELECT t1.id FROM entity_entity t1 " + f"JOIN (VALUES {values_list}) AS t2(entity_type_id, entity_id) " + f"ON t1.entity_type_id = t2.entity_type_id AND t1.entity_id = t2.entity_id", + values, ) upserted_entities = Entity.all_objects.filter( id__in=[entity[0] for entity in cursor.fetchall()] ) if sync: + # Use the same VALUES list in the cleanup query sync_cleanup_query = ( - f'DELETE FROM entity_entity WHERE id IN (' - f'SELECT id FROM (' - f'WITH initial AS ({initial_queryset.query}), ' - f'syncd AS (SELECT entity_type_id, entity_id FROM {table_name}) ' - f'SELECT initial.id, initial.entity_id FROM initial ' - f'LEFT OUTER JOIN syncd ON initial.entity_type_id=syncd.entity_type_id AND ' - f'initial.entity_id=syncd.entity_id ' - f'WHERE syncd.entity_id IS NULL' - f') as ids' - f')' + f"DELETE FROM entity_entity WHERE id IN (" + f"SELECT id FROM (" + f"WITH initial AS ({initial_queryset.query}), " + f"syncd AS (SELECT entity_type_id, entity_id FROM (VALUES {values_list}) " + f"AS v(entity_type_id, entity_id)) " + f"SELECT initial.id, initial.entity_id FROM initial " + f"LEFT OUTER JOIN syncd ON initial.entity_type_id=syncd.entity_type_id AND " + f"initial.entity_id=syncd.entity_id " + f"WHERE syncd.entity_id IS NULL" + f") as ids" + f")" ) - cursor.execute(sync_cleanup_query) + cursor.execute(sync_cleanup_query, values) # Compute the current state of the entities current_entity_activation_state = { - entity.id: entity.is_active - for entity in upserted_entities + entity.id: entity.is_active for entity in upserted_entities } # Computed the changed activation state of the entities changed_entity_activation_state = {} - all_entity_ids = set(initial_entity_activation_state.keys()) | set(current_entity_activation_state.keys()) + all_entity_ids = set(initial_entity_activation_state.keys()) | set( + current_entity_activation_state.keys() + ) for entity_id in all_entity_ids: # Get the initial activation state of the entity # Default to false so we only detect when the model has actually changed - initial_activation_state = initial_entity_activation_state.get(entity_id, False) + initial_activation_state = initial_entity_activation_state.get( + entity_id, False + ) # Get the current state of the entity # Default to false here since the upserts do not return is_active=False due # to the default object manager excluding these - current_activation_state = current_entity_activation_state.get(entity_id, False) + current_activation_state = current_entity_activation_state.get( + entity_id, False + ) # Check if the state changed and at it to the changed entity state if initial_activation_state != current_activation_state: @@ -596,30 +627,32 @@ def upsert_entity_relationships(self, original_entity_ids, entity_relationships) pgbulk.upsert( initial_queryset, entity_relationships, - ['sub_entity_id', 'super_entity_id'], + ["sub_entity_id", "super_entity_id"], ignore_unchanged=True, ) if entity_relationships: # Get the new and updated relationships that were upserted uuid = uuid4() - table_name = f'sync_entity_relationships_{uuid}'.replace('-', '_') + table_name = f"sync_entity_relationships_{uuid}".replace("-", "_") with connection.cursor() as cursor: cursor.execute( - f'CREATE TEMPORARY TABLE {table_name} (' - 'sub_entity_id INTEGER, ' - 'super_entity_id INTEGER, ' - 'PRIMARY KEY(sub_entity_id, super_entity_id))' + f"CREATE TEMPORARY TABLE {table_name} (" + "sub_entity_id INTEGER, " + "super_entity_id INTEGER, " + "PRIMARY KEY(sub_entity_id, super_entity_id))" ) values = [] for relationship in entity_relationships: - values.extend([relationship.sub_entity_id, relationship.super_entity_id]) - values_escaped = ','.join([ - '(%s, %s)' for i in range(0, int(len(values) / 2)) - ]) + values.extend( + [relationship.sub_entity_id, relationship.super_entity_id] + ) + values_escaped = ",".join( + ["(%s, %s)" for i in range(0, int(len(values) / 2))] + ) cursor.execute( - f'INSERT INTO {table_name} (sub_entity_id, super_entity_id) VALUES {values_escaped}', - values + f"INSERT INTO {table_name} (sub_entity_id, super_entity_id) VALUES {values_escaped}", + values, ) # If we upserted relationships, we need to delete relationships from the initial set that weren't just upserted @@ -627,16 +660,16 @@ def upsert_entity_relationships(self, original_entity_ids, entity_relationships) if entity_relationships: with connection.cursor() as cursor: query = ( - f'DELETE FROM entity_entityrelationship WHERE id IN (' - f'SELECT id FROM (' - f'WITH initial AS ({initial_queryset.query}), ' - f'syncd AS (SELECT sub_entity_id, super_entity_id FROM {table_name}) ' - f'SELECT initial.id, initial.super_entity_id FROM initial ' - f'LEFT OUTER JOIN syncd ON initial.sub_entity_id=syncd.sub_entity_id AND ' - f'initial.super_entity_id=syncd.super_entity_id ' - f'WHERE syncd.super_entity_id IS NULL' - f') as ids' - f')' + f"DELETE FROM entity_entityrelationship WHERE id IN (" + f"SELECT id FROM (" + f"WITH initial AS ({initial_queryset.query}), " + f"syncd AS (SELECT sub_entity_id, super_entity_id FROM {table_name}) " + f"SELECT initial.id, initial.super_entity_id FROM initial " + f"LEFT OUTER JOIN syncd ON initial.sub_entity_id=syncd.sub_entity_id AND " + f"initial.super_entity_id=syncd.super_entity_id " + f"WHERE syncd.super_entity_id IS NULL" + f") as ids" + f")" ) cursor.execute(query) # Else, just delete everything from the initial queryset, since we didn't upsert anything @@ -650,7 +683,9 @@ def _get_entity_relationships_to_sync(self, original_entity_ids) -> QuerySet: :param original_entity_ids: The list of the entities originally meant to be sync'd by this process """ - queryset = EntityRelationship.objects.filter(sub_entity_id__in=original_entity_ids) + queryset = EntityRelationship.objects.filter( + sub_entity_id__in=original_entity_ids + ) if self.sync_all: # If we're syncing everything, just sync against the entire entity relationship @@ -679,7 +714,7 @@ def send_entity_activation_events(self, changed_entity_activation_state): model_activations_changed.send( sender=Entity, instance_ids=sorted(list(activated_entities)), - is_active=True + is_active=True, ) # If any entities were deactivated call the activation change event with the active flag as false @@ -687,5 +722,5 @@ def send_entity_activation_events(self, changed_entity_activation_state): model_activations_changed.send( sender=Entity, instance_ids=sorted(list(deactivated_entities)), - is_active=False + is_active=False, ) From 96c6e0f9e52d5365f967acbb692ca328f8ab8191 Mon Sep 17 00:00:00 2001 From: Ryan Bales Date: Fri, 10 Jan 2025 14:02:16 -0500 Subject: [PATCH 4/7] cleanup --- entity/sync.py | 27 +-------------------------- 1 file changed, 1 insertion(+), 26 deletions(-) diff --git a/entity/sync.py b/entity/sync.py index b31050f..e9f7542 100644 --- a/entity/sync.py +++ b/entity/sync.py @@ -484,32 +484,6 @@ def upsert_entities(self, entities, sync=False): :param entities: The entities to sync :param sync: Do a sync instead of an upsert """ - # Select the entities we are upserting for update to reduce deadlocks - if entities: - # Default select for update query when syncing all - select_for_update_query = ( - "SELECT FROM {table_name} ORDER BY id ASC FOR NO KEY UPDATE" - ).format(table_name=Entity._meta.db_table) - select_for_update_query_params = [] - - # If we are not syncing all, only select those we are updating - if not sync: - select_for_update_query = ( - "SELECT FROM {table_name} " - "WHERE (entity_type_id, entity_id) IN %s " - "ORDER BY id ASC " - "FOR NO KEY UPDATE" - ).format(table_name=Entity._meta.db_table) - select_for_update_query_params = [ - tuple( - (entity.entity_type_id, entity.entity_id) for entity in entities - ) - ] - - # Select the items for update - with connection.cursor() as cursor: - cursor.execute(select_for_update_query, select_for_update_query_params) - # Compute the initial queryset and the initial state of the entities we are syncing # We need the initial state so we can compare it to the new state to determine any # entities that were activated or deactivated @@ -565,6 +539,7 @@ def upsert_entities(self, entities, sync=False): id__in=[entity[0] for entity in cursor.fetchall()] ) + # Delete entities not created/upserted if sync is True if sync: # Use the same VALUES list in the cleanup query sync_cleanup_query = ( From 409543fe4e9ecf18cb813356f26bf63a8cfe2747 Mon Sep 17 00:00:00 2001 From: Ryan Bales Date: Fri, 10 Jan 2025 14:07:47 -0500 Subject: [PATCH 5/7] cleanup and used cte in entity relationships sync cleanup query --- entity/sync.py | 41 ++++++++++++-------------------------- entity/tests/sync_tests.py | 6 +++--- 2 files changed, 16 insertions(+), 31 deletions(-) diff --git a/entity/sync.py b/entity/sync.py index e9f7542..a12f218 100644 --- a/entity/sync.py +++ b/entity/sync.py @@ -9,7 +9,6 @@ from collections import defaultdict from time import sleep from typing import TYPE_CHECKING -from uuid import uuid4 import pgbulk import wrapt @@ -608,45 +607,31 @@ def upsert_entity_relationships(self, original_entity_ids, entity_relationships) if entity_relationships: # Get the new and updated relationships that were upserted - uuid = uuid4() - table_name = f"sync_entity_relationships_{uuid}".replace("-", "_") with connection.cursor() as cursor: - cursor.execute( - f"CREATE TEMPORARY TABLE {table_name} (" - "sub_entity_id INTEGER, " - "super_entity_id INTEGER, " - "PRIMARY KEY(sub_entity_id, super_entity_id))" - ) + # Create the values string for the VALUES list values = [] + values_list_items = [] for relationship in entity_relationships: - values.extend( - [relationship.sub_entity_id, relationship.super_entity_id] - ) - values_escaped = ",".join( - ["(%s, %s)" for i in range(0, int(len(values) / 2))] - ) - cursor.execute( - f"INSERT INTO {table_name} (sub_entity_id, super_entity_id) VALUES {values_escaped}", - values, - ) + values.extend([relationship.super_entity_id, relationship.sub_entity_id]) + values_list_items.append("(%s, %s)") + values_list = ",".join(values_list_items) - # If we upserted relationships, we need to delete relationships from the initial set that weren't just upserted - # We'll use the temp table as a reference for what was upserted - if entity_relationships: - with connection.cursor() as cursor: - query = ( + # If we upserted relationships, we need to delete relationships from the initial set that weren't just upserted + # We'll use the temp table as a reference for what was upserted + sync_cleanup_query = ( f"DELETE FROM entity_entityrelationship WHERE id IN (" f"SELECT id FROM (" f"WITH initial AS ({initial_queryset.query}), " - f"syncd AS (SELECT sub_entity_id, super_entity_id FROM {table_name}) " + f"syncd AS (SELECT super_entity_id, sub_entity_id FROM (VALUES {values_list}) " + f"AS v(super_entity_id, sub_entity_id)) " f"SELECT initial.id, initial.super_entity_id FROM initial " - f"LEFT OUTER JOIN syncd ON initial.sub_entity_id=syncd.sub_entity_id AND " - f"initial.super_entity_id=syncd.super_entity_id " + f"LEFT OUTER JOIN syncd ON initial.super_entity_id=syncd.super_entity_id AND " + f"initial.sub_entity_id=syncd.sub_entity_id " f"WHERE syncd.super_entity_id IS NULL" f") as ids" f")" ) - cursor.execute(query) + cursor.execute(sync_cleanup_query, values) # Else, just delete everything from the initial queryset, since we didn't upsert anything else: initial_queryset.delete() diff --git a/entity/tests/sync_tests.py b/entity/tests/sync_tests.py index fbece84..b5c9332 100644 --- a/entity/tests/sync_tests.py +++ b/entity/tests/sync_tests.py @@ -956,7 +956,7 @@ def test_optimal_queries_registered_entity_with_no_qset(self): team_group = G(TeamGroup) ContentType.objects.clear_cache() - with self.assertNumQueries(18): + with self.assertNumQueries(15): team_group.save() def test_optimal_queries_registered_entity_w_qset(self): @@ -966,7 +966,7 @@ def test_optimal_queries_registered_entity_w_qset(self): account = G(Account) ContentType.objects.clear_cache() - with self.assertNumQueries(21): + with self.assertNumQueries(18): account.save() def test_sync_all_optimal_queries(self): @@ -995,7 +995,7 @@ def test_sync_all_optimal_queries(self): with patch('entity.sync.entity_registry') as mock_entity_registry: mock_entity_registry.entity_registry = new_registry.entity_registry ContentType.objects.clear_cache() - with self.assertNumQueries(26): + with self.assertNumQueries(21): sync_entities() self.assertEqual(Entity.objects.filter(entity_type=ContentType.objects.get_for_model(Account)).count(), 5) From d278a5ceb5e23842ea3861a79071cd6388c2b2ff Mon Sep 17 00:00:00 2001 From: Ryan Bales Date: Fri, 10 Jan 2025 14:14:07 -0500 Subject: [PATCH 6/7] build --- entity/sync.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/entity/sync.py b/entity/sync.py index a12f218..2feaf89 100644 --- a/entity/sync.py +++ b/entity/sync.py @@ -400,8 +400,7 @@ def sync(self): for sub_ctype, super_entities_by_sub_ctype in super_entities_by_ctype.items() for super_ctype, relationships in super_entities_by_sub_ctype.items() for sub_entity_id, super_entity_id in relationships - if (sub_ctype.id, sub_entity_id) in entities_map - and (super_ctype.id, super_entity_id) in entities_map + if (sub_ctype.id, sub_entity_id) in entities_map and (super_ctype.id, super_entity_id) in entities_map ] # Find the entities of the original model objects we were syncing. These @@ -616,7 +615,8 @@ def upsert_entity_relationships(self, original_entity_ids, entity_relationships) values_list_items.append("(%s, %s)") values_list = ",".join(values_list_items) - # If we upserted relationships, we need to delete relationships from the initial set that weren't just upserted + # If we upserted relationships, + # we need to delete relationships from the initial set that weren't just upserted # We'll use the temp table as a reference for what was upserted sync_cleanup_query = ( f"DELETE FROM entity_entityrelationship WHERE id IN (" From afedac7a01283cdf776716494295584f2151491b Mon Sep 17 00:00:00 2001 From: Ryan Bales Date: Fri, 10 Jan 2025 14:17:40 -0500 Subject: [PATCH 7/7] dropping support for python versions older than 3.9 --- .github/workflows/tests.yml | 9 +-------- setup.py | 2 -- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d2361b4..d3206c6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -12,7 +12,7 @@ jobs: strategy: fail-fast: false matrix: - python: ['3.7', '3.8', '3.9'] + python: ['3.9'] # Time to switch to pytest or nose2?? # nosetests is broken on 3.10 # AttributeError: module 'collections' has no attribute 'Callable' @@ -31,13 +31,6 @@ jobs: # # `continue-on-error: true`. Github Actions apparently does not # # have this feature, similar to Travis' allow-failure, yet. # # https://github.com/actions/toolkit/issues/399 - exclude: - - python: '3.7' - django: 'Django~=4.0.0' - - python: '3.7' - django: 'Django~=4.1.0' - - python: '3.7' - django: 'Django~=4.2.0' services: postgres: image: postgres:latest diff --git a/setup.py b/setup.py index 5c6f0e5..1105afd 100644 --- a/setup.py +++ b/setup.py @@ -38,8 +38,6 @@ def get_lines(file_path): packages=find_packages(), classifiers=[ 'Programming Language :: Python', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Intended Audience :: Developers', 'License :: OSI Approved :: MIT License',