Skip to content

Commit

Permalink
Merge branch 'main' into new_counts_for_node_and_status_pt2
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Nov 27, 2024
2 parents 3d83631 + c0891b9 commit 789aec6
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 28 deletions.
4 changes: 3 additions & 1 deletion temba/flows/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,9 @@ def get_squash_query(cls, distinct_set) -> tuple:
DELETE FROM %(table)s WHERE "flow_id" = %%s AND "scope" = %%s RETURNING "count"
)
INSERT INTO %(table)s("flow_id", "scope", "count", "is_squashed")
VALUES (%%s, %%s, GREATEST(0, (SELECT SUM("count") FROM removed)), TRUE);
SELECT %%s, %%s, s.total, TRUE FROM (
SELECT COALESCE(SUM("count"), 0) AS "total" FROM removed
) s WHERE s.total != 0;
""" % {
"table": cls._meta.db_table
}
Expand Down
58 changes: 47 additions & 11 deletions temba/flows/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from openpyxl import load_workbook

from django.core.files.storage import default_storage
from django.db import connection
from django.db.models.functions import TruncDate
from django.test.utils import override_settings
from django.urls import reverse
Expand Down Expand Up @@ -34,6 +35,7 @@
from .checks import mailroom_url
from .models import (
Flow,
FlowActivityCount,
FlowCategoryCount,
FlowLabel,
FlowNodeCount,
Expand Down Expand Up @@ -5665,21 +5667,55 @@ def handle(msg, flow):
self.assertEqual(3, flow2.counts.count())

def test_squashing(self):
flow = self.create_flow("Test")
flow.counts.create(scope="foo:1", count=1)
flow.counts.create(scope="foo:1", count=2)
flow.counts.create(scope="foo:2", count=4)
flow1 = self.create_flow("Test 1")
flow1.counts.create(scope="foo:1", count=1)
flow1.counts.create(scope="foo:1", count=2)
flow1.counts.create(scope="foo:2", count=4)
flow1.counts.create(scope="foo:3", count=-6)
flow1.counts.create(scope="foo:3", count=-1)

self.assertEqual(3, flow.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow.counts.filter(scope="foo:2").sum())
self.assertEqual(0, flow.counts.filter(scope="foo:3").sum())
flow2 = self.create_flow("Test 2")
flow2.counts.create(scope="foo:1", count=7)
flow2.counts.create(scope="foo:1", count=3)
flow2.counts.create(scope="foo:2", count=8) # unsquashed that sum to zero
flow2.counts.create(scope="foo:2", count=-8)
flow2.counts.create(scope="foo:3", count=5)

self.assertEqual(3, flow1.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow1.counts.filter(scope="foo:2").sum())
self.assertEqual(-7, flow1.counts.filter(scope="foo:3").sum()) # negative counts supported
self.assertEqual(0, flow1.counts.filter(scope="foo:4").sum()) # zero if no such scope exists
self.assertEqual(10, flow2.counts.filter(scope="foo:1").sum())
self.assertEqual(0, flow2.counts.filter(scope="foo:2").sum())
self.assertEqual(5, flow2.counts.filter(scope="foo:3").sum())

squash_activity_counts()

self.assertEqual(2, flow.counts.count())
self.assertEqual(3, flow.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow.counts.filter(scope="foo:2").sum())
self.assertEqual(0, flow.counts.filter(scope="foo:3").sum())
self.assertEqual({"foo:1", "foo:2", "foo:3"}, set(flow1.counts.values_list("scope", flat=True)))

# flow2/foo:2 should be gone because it squashed to zero
self.assertEqual({"foo:1", "foo:3"}, set(flow2.counts.values_list("scope", flat=True)))

self.assertEqual(3, flow1.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow1.counts.filter(scope="foo:2").sum())
self.assertEqual(-7, flow1.counts.filter(scope="foo:3").sum())
self.assertEqual(10, flow2.counts.filter(scope="foo:1").sum())
self.assertEqual(0, flow2.counts.filter(scope="foo:2").sum())
self.assertEqual(5, flow2.counts.filter(scope="foo:3").sum())

flow2.counts.create(scope="foo:3", count=-5) # unsquashed zero + squashed zero

squash_activity_counts()

# flow2/foo:3 should be gone because it squashed to zero
self.assertEqual({"foo:1"}, set(flow2.counts.values_list("scope", flat=True)))

# test that model being asked to squash a set that matches no rows doesn't insert anytihng
with connection.cursor() as cursor:
sql, params = FlowActivityCount.get_squash_query(FlowActivityCount(flow_id=flow1.id, scope="foo:9"))
cursor.execute(sql, params)

self.assertEqual({"foo:1", "foo:2", "foo:3"}, set(flow1.counts.values_list("scope", flat=True)))


class BackfillNewCountsTest(MigrationTest):
Expand Down
4 changes: 3 additions & 1 deletion temba/orgs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1892,7 +1892,9 @@ def get_squash_query(cls, distinct_set) -> tuple:
DELETE FROM %(table)s WHERE "org_id" = %%s AND "scope" = %%s RETURNING "count"
)
INSERT INTO %(table)s("org_id", "scope", "count", "is_squashed")
VALUES (%%s, %%s, GREATEST(0, (SELECT SUM("count") FROM removed)), TRUE);
SELECT %%s, %%s, s.total, TRUE FROM (
SELECT COALESCE(SUM("count"), 0) AS "total" FROM removed
) s WHERE s.total != 0;
""" % {
"table": cls._meta.db_table
}
Expand Down
8 changes: 1 addition & 7 deletions temba/tickets/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,10 @@ def assert_counts(

squash_item_counts()

# check new count model raw values are consistent
# check count model raw values are consistent
self.assertEqual(
{
f"tickets:C:{general.id}:{self.admin.id}": 0,
f"tickets:C:{general.id}:{self.agent.id}": 0,
f"tickets:C:{cats.id}:0": 0,
f"tickets:C:{cats.id}:{self.admin.id}": 0,
f"tickets:O:{general.id}:0": 0,
f"tickets:O:{general.id}:{self.editor.id}": 1,
f"tickets:O:{general.id}:{self.agent.id}": 0,
f"tickets:O:{cats.id}:0": 1,
f"tickets:O:{cats.id}:{self.admin.id}": 1,
},
Expand Down
20 changes: 12 additions & 8 deletions temba/utils/models/squashable.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import logging
import time
from abc import abstractmethod

from django.db import connection, models
Expand All @@ -12,6 +10,7 @@ class SquashableModel(models.Model):
"""

squash_over = ()
squash_max_distinct = 5000

id = models.BigAutoField(auto_created=True, primary_key=True)
is_squashed = models.BooleanField(default=False)
Expand All @@ -21,21 +20,26 @@ def get_unsquashed(cls):
return cls.objects.filter(is_squashed=False)

@classmethod
def squash(cls):
start = time.time()
def squash(cls) -> int:
"""
Squashes all distinct sets of counts with unsquashed rows into a single row if they sum to non-zero or just
deletes them if they sum to zero. Returns the number of sets squashed.
"""

num_sets = 0
distinct_sets = (
cls.get_unsquashed().order_by(*cls.squash_over).distinct(*cls.squash_over)[: cls.squash_max_distinct]
)

for distinct_set in cls.get_unsquashed().order_by(*cls.squash_over).distinct(*cls.squash_over)[:5000]:
for distinct_set in distinct_sets:
with connection.cursor() as cursor:
sql, params = cls.get_squash_query(distinct_set)

cursor.execute(sql, params)

num_sets += 1

time_taken = time.time() - start

logging.debug("Squashed %d distinct sets of %s in %0.3fs" % (num_sets, cls.__name__, time_taken))
return num_sets

@classmethod
@abstractmethod
Expand Down

0 comments on commit 789aec6

Please sign in to comment.