Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ignoring unchanged updates in entity sync #189

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 120 additions & 59 deletions entity/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@
Provides functions for syncing entities and their relationships to the
Entity and EntityRelationship tables.
"""
from __future__ import annotations

import logging
from collections import defaultdict
from time import sleep
from typing import TYPE_CHECKING

import pgbulk
import wrapt
from collections import defaultdict

from activatable_model import model_activations_changed
from django import db
from django.contrib.contenttypes.models import ContentType
import manager_utils
from django.db import transaction, connection

from django.db import connection, transaction
from entity.config import entity_registry
from entity.models import Entity, EntityRelationship, EntityKind
from entity.models import Entity, EntityKind, EntityRelationship

if TYPE_CHECKING:
from django.db.models import QuerySet

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -249,7 +252,7 @@ def sync_entities_watching(instance):
sync_entities(*model_objs)


class EntitySyncer(object):
class EntitySyncer:
"""
A class that will handle the syncing of entities
"""
Expand All @@ -270,8 +273,6 @@ def sync(self):
LOG.debug('sync_entities')
LOG.debug(self.model_objs)

# Determine if we are syncing all
sync_all = not self.model_objs
model_objs_map = {
(ContentType.objects.get_for_model(model_obj, for_concrete_model=False), model_obj.id): model_obj
for model_obj in self.model_objs
Expand Down Expand Up @@ -301,12 +302,17 @@ def sync(self):
# For each ctype, obtain super entities. This is a dict keyed on ctype. Each value
# is a dict keyed on the ctype of the super entity with a list of tuples for
# IDs of sub/super entity relationships
super_entities_by_ctype = _get_super_entities_by_ctype(model_objs_by_ctype, model_ids_to_sync, sync_all)
super_entities_by_ctype = _get_super_entities_by_ctype(model_objs_by_ctype, model_ids_to_sync, self.sync_all)

# Now that we have all models we need to sync, fetch them so that we can extract
# metadata and entity kinds. If we are syncing all entities, we've already fetched
# everything and can fill in this data struct without doing another DB hit
model_objs_to_sync = _get_model_objs_to_sync(model_ids_to_sync, model_objs_map, model_objs_by_ctype, sync_all)
model_objs_to_sync = _get_model_objs_to_sync(
model_ids_to_sync,
model_objs_map,
model_objs_by_ctype,
self.sync_all,
)

# Obtain all entity kind tuples associated with the models
entity_kind_tuples_to_sync = set()
Expand Down Expand Up @@ -384,16 +390,9 @@ def sync(self):
if (ctype.id, model_obj.id) in entities_map
]

if self.sync_all:
# If we're syncing everything, just sync against the entire entity relationship
# table instead of doing a complex __in query
sync_against = EntityRelationship.objects.all()
else:
sync_against = EntityRelationship.objects.filter(sub_entity_id__in=original_entity_ids)

# Sync the relations
self.upsert_entity_relationships(
queryset=sync_against,
original_entity_ids,
entity_relationships=entity_relationships_to_sync
)

Expand Down Expand Up @@ -437,14 +436,12 @@ def upsert_entity_kinds(self, entity_kinds):
list(EntityKind.all_objects.all().order_by('id').select_for_update().values_list('id', flat=True))

# Upsert the entity kinds
upserted_enitity_kinds = manager_utils.bulk_upsert(
queryset=EntityKind.all_objects.filter(
name__in=[entity_kind.name for entity_kind in changed_entity_kinds]
),
model_objs=changed_entity_kinds,
unique_fields=['name'],
update_fields=['display_name'],
return_upserts=True
upserted_enitity_kinds = pgbulk.upsert(
EntityKind,
changed_entity_kinds,
['name'],
['display_name'],
returning=True,
)

# Return all the entity kinds
Expand Down Expand Up @@ -504,24 +501,34 @@ def upsert_entities(self, entities, sync=False):
for entity in initial_queryset.values_list('id', 'is_active')
}

# Sync all the entities if the sync flag is passed
if sync:
upserted_entities = manager_utils.sync(
queryset=initial_queryset,
model_objs=entities,
unique_fields=['entity_type_id', 'entity_id'],
update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'],
return_upserts=True
)
# Otherwise we want to upsert our entities
else:
upserted_entities = manager_utils.bulk_upsert(
queryset=initial_queryset,
model_objs=entities,
unique_fields=['entity_type_id', 'entity_id'],
update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'],
return_upserts=True
)
# Upsert entities
pgbulk.upsert(
queryset=initial_queryset,
model_objs=entities,
unique_fields=['entity_type_id', 'entity_id'],
update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'],
ignore_unchanged=True,
)

upserted_entities = []
if entities:
upserted_entities = list(Entity.all_objects.extra(
where=['(entity_type_id, entity_id) IN %s'],
params=[tuple(
(entity.entity_type_id, entity.entity_id)
for entity in entities
)]
))

# Delete unreferenced entities if sync=True
if sync:
models_to_delete = [
model
for model in initial_queryset
if model.id not in [entity.id for entity in upserted_entities]
]
if models_to_delete:
initial_queryset.exclude(id__in=[entity.id for entity in upserted_entities]).delete()

# Compute the current state of the entities
current_entity_activation_state = {
Expand Down Expand Up @@ -550,29 +557,83 @@ def upsert_entities(self, entities, sync=False):
return upserted_entities, changed_entity_activation_state

@transaction_atomic_with_retry()
def upsert_entity_relationships(self, queryset, entity_relationships):
def upsert_entity_relationships(self, original_entity_ids, entity_relationships):
"""
Upsert entity relationships to the database
:param queryset: The base queryset to use
Sync entity relationships to the database

:param queryset: The ids of the original entity objects that are being sync'd
:param entity_relationships: The entity relationships to ensure exist in the database
"""
initial_queryset = self._get_entity_relationships_to_sync(original_entity_ids)

initial_queryset_ids = None
# Select the relationships for update
if entity_relationships:
list(queryset.order_by('id').select_for_update().values_list(
'id',
flat=True
))
initial_queryset_ids = list(
initial_queryset.order_by('id').select_for_update().values_list(
'id',
flat=True
)
)

# Sync the relationships
return manager_utils.sync(
queryset=queryset,
model_objs=entity_relationships,
unique_fields=['sub_entity_id', 'super_entity_id'],
update_fields=[],
return_upserts=True
# Upsert the relationships
pgbulk.upsert(
initial_queryset,
entity_relationships,
['sub_entity_id', 'super_entity_id'],
ignore_unchanged=True,
)

syncd_relationships = []
syncd_relationship_ids = []
if entity_relationships:
# Get the new and updated relationships that were upserted
syncd_relationships = list(
EntityRelationship.objects.extra(
where=['(sub_entity_id, super_entity_id) IN %s'],
params=[tuple(
(relationship.sub_entity_id, relationship.super_entity_id)
for relationship in entity_relationships
)]
)
)
syncd_relationship_ids = [relationship.id for relationship in syncd_relationships]

# Determine whether there are relationships that should be deleted
# ----------------------------------------------------------------
# If there were relationships explicitly provided, then we selected the initial set for update
# and have a reference to those ids. We can simply compare the updated/sync'd set of ids to
# determine if there are old relationships that have to be deleted
if entity_relationships:
relationships_to_delete = [
relationship_id for relationship_id in initial_queryset_ids
if relationship_id not in syncd_relationship_ids
]
EntityRelationship.objects.filter(id__in=relationships_to_delete).delete()
# Else, just exclude the sync'd ids from the initial queryset to delete
else:
self._get_entity_relationships_to_sync(original_entity_ids).exclude(
id__in=syncd_relationship_ids
).delete()

return syncd_relationships

def _get_entity_relationships_to_sync(self, original_entity_ids) -> QuerySet:
"""
Given the calling context and the ids of the entities this process was originally syncing,
return the entity relationship queryset that should be replaced/sync'd with an updated set of relationships

:param original_entity_ids: The list of the entities originally meant to be sync'd by this process
"""
queryset = EntityRelationship.objects.filter(sub_entity_id__in=original_entity_ids)

if self.sync_all:
# If we're syncing everything, just sync against the entire entity relationship
# table instead of doing a complex __in query
queryset = EntityRelationship.objects.all()

return queryset

def send_entity_activation_events(self, changed_entity_activation_state):
"""
Given a changed entity state dict, fire the appropriate signals
Expand Down
2 changes: 1 addition & 1 deletion entity/tests/sync_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ def test_sync_all_optimal_queries(self):
with patch('entity.sync.entity_registry') as mock_entity_registry:
mock_entity_registry.entity_registry = new_registry.entity_registry
ContentType.objects.clear_cache()
with self.assertNumQueries(20):
with self.assertNumQueries(22):
sync_entities()

self.assertEqual(Entity.objects.filter(entity_type=ContentType.objects.get_for_model(Account)).count(), 5)
Expand Down
2 changes: 1 addition & 1 deletion entity/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '6.2.3'
__version__ = '6.3.0'
2 changes: 2 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## Release Notes

- 6.3.0:
- Ignore no-op updates when syncing entities and entity relationships in order to avoid downstream cost
- 6.2.3:
- Update the `defer_entity_syncing` decorator to support an optional handler.
- 6.2.2:
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Django>=3.2
django-activatable-model>=3.1.0
django-pgbulk==3.2.0
django-manager-utils>=3.1.0
python3-utils>=0.3
wrapt>=1.10.5
Loading