-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add event tracking #349
base: main
Are you sure you want to change the base?
add event tracking #349
Changes from 2 commits
51ffdb5
f97b2c5
4a2467c
dadbeae
99467b8
46e95e5
6be106a
a7b1db5
e85f605
e1c03fd
c59a387
51b7719
34bb90f
fdd182a
badcef5
a753311
dac5557
0a6e359
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from django.apps import AppConfig | ||
|
||
|
||
class EventsAppConfig(AppConfig): | ||
default_auto_field = "django.db.models.BigAutoField" | ||
name = "commcare_connect.events" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# Generated by Django 4.2.5 on 2024-07-13 14:45 | ||
|
||
from django.conf import settings | ||
from django.db import migrations, models | ||
import django.db.models.deletion | ||
|
||
|
||
class Migration(migrations.Migration): | ||
initial = True | ||
|
||
dependencies = [ | ||
migrations.swappable_dependency(settings.AUTH_USER_MODEL), | ||
("opportunity", "0044_opportunityverificationflags"), | ||
] | ||
|
||
operations = [ | ||
migrations.CreateModel( | ||
name="Event", | ||
fields=[ | ||
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), | ||
("date_created", models.DateTimeField(auto_now_add=True, db_index=True)), | ||
( | ||
"event_type", | ||
models.CharField( | ||
choices=[ | ||
("invite_sent", "Invite Sent"), | ||
("records_approved", "Records Approved"), | ||
("records_flagged", "Records Flagged"), | ||
("records_rejected", "Records Rejected"), | ||
("payment_approved", "Payment Approved"), | ||
("payment_accrued", "Payment Accrued"), | ||
("payment_transferred", "Payment Transferred"), | ||
("notifications_sent", "Notifications Sent"), | ||
("additional_budget_added", "Additional Budget Added"), | ||
], | ||
max_length=40, | ||
), | ||
), | ||
( | ||
"opportunity", | ||
models.ForeignKey( | ||
null=True, on_delete=django.db.models.deletion.PROTECT, to="opportunity.opportunity" | ||
), | ||
), | ||
( | ||
"user", | ||
models.ForeignKey( | ||
null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL | ||
), | ||
), | ||
], | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
from abc import ABCMeta, abstractproperty | ||
from dataclasses import dataclass | ||
from datetime import datetime | ||
|
||
from django.db import models | ||
from django.utils.translation import gettext as _ | ||
|
||
from commcare_connect.users.models import User | ||
|
||
from . import types | ||
|
||
|
||
class Event(models.Model): | ||
from commcare_connect.opportunity.models import Opportunity | ||
|
||
# this allows referring to event types in this style: Event.Type.INVITE_SENT | ||
Type = types | ||
|
||
date_created = models.DateTimeField(auto_now_add=True, db_index=True) | ||
event_type = models.CharField(max_length=40, choices=types.EVENT_TYPE_CHOICES) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mentioned you were going to rethink this in response to my last comment but appears to still be here? How has your thinking here evolved? Why did you decide to require a migration each time we have a new event? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I am going to make it a callable to avoid migrations getting created There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, it looks like this won't work since we are on Django 4 (and callable only works in 5), we can do it using validators. But the trade off is that we need to have some extra code to toggle the slug and verbose_name (and back) while processing in some of the places (which is ugly and unnecessary). I could live with extra migration until Django 5 instead of the unnecessary code. Do you feel strongly against migrations? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I am not sure I totally understand why we need restricted choices here at all. That seems pretty nonstandard for an analytics system (if you think of GA or kissmetrics, they allow arbitrary events), and makes adding new events, whcih we expect to do incrementally much trickier. Mobile will need to coordinate releases around web running these migrations, and simple communication errors could cause confusing problems. I like the idea on web of predefining our events, and only using ones we have listed in something like a constant file, but I am not sure I am sold on enforcing them at the db level. As with other comments, I am open to being convinced |
||
user = models.ForeignKey(User, on_delete=models.CASCADE, null=True) | ||
opportunity = models.ForeignKey(Opportunity, on_delete=models.PROTECT, null=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will add again that I think a metadata field will be very beneficial. Even with the existing set of events we have, data like which record was approved, or how much the payment was could be very useful, and I know that mobile was also hoping to include additional metadata. |
||
|
||
def track(self, use_async=True): | ||
""" | ||
To track an event, instantiate the object and call this method, | ||
instead of calling save directly. | ||
|
||
If use_async is True, the event is queued in Redis and saved | ||
via celery, otherwise it's saved directly. | ||
""" | ||
from commcare_connect.events.tasks import track_event | ||
|
||
track_event(self, use_async=use_async) | ||
|
||
|
||
@dataclass | ||
class InferredEvent: | ||
from commcare_connect.opportunity.models import Opportunity | ||
|
||
user: User | ||
opportunity: Opportunity | ||
date_created: datetime | ||
event_type: str | ||
|
||
|
||
class InferredEventSpec(metaclass=ABCMeta): | ||
""" | ||
Use this to define an Event that can be inferred | ||
based on other models. See RecordsFlagged for example | ||
""" | ||
|
||
@abstractproperty | ||
def model_cls(self): | ||
""" | ||
The source model class to infer the event from | ||
for e.g. UserVisit | ||
""" | ||
raise NotImplementedError | ||
|
||
@abstractproperty | ||
def event_type(self): | ||
""" | ||
Should be a tuple to indicate the name | ||
for e.g. "RECORDS_FLAGGED", gettext("Records Flagged") | ||
""" | ||
raise NotImplementedError | ||
|
||
@abstractproperty | ||
def event_filters(self): | ||
""" | ||
Should be a dict of the queryset filters | ||
for e.g. {'flagged': True} for RecordsFlagged for UserVisit | ||
""" | ||
raise NotImplementedError | ||
|
||
@abstractproperty | ||
def user(self): | ||
""" | ||
The field corresponding to user on source model. | ||
""" | ||
raise NotImplementedError | ||
|
||
@abstractproperty | ||
def date_created(self): | ||
""" | ||
The field corresponding to user date_created. | ||
""" | ||
raise NotImplementedError | ||
|
||
@abstractproperty | ||
def opportunity(self): | ||
""" | ||
The field corresponding to user opportunity, could be None. | ||
""" | ||
raise NotImplementedError | ||
|
||
def get_events(self, user=None, from_date=None, to_date=None): | ||
filters = {} | ||
filters.update(self.event_filters) | ||
|
||
if user: | ||
filters.update({self.user: user}) | ||
if from_date: | ||
filters.update({f"{self.date_created}__gte": from_date}) | ||
if to_date: | ||
filters.update({f"{self.date_created}__lte": to_date}) | ||
|
||
events = ( | ||
self.model_cls.objects.filter(**filters).values(self.user, self.opportunity, self.date_created).iterator() | ||
) | ||
for event in events: | ||
yield InferredEvent( | ||
event_type=self.event_type[0], | ||
user=event[self.user], | ||
date_created=event[self.date_created], | ||
opportunity=event.get(self.opportunity, None), | ||
) | ||
|
||
|
||
class RecordsFlagged(InferredEventSpec): | ||
event_type = (types.RECORDS_FLAGGED, _("Records Flagged")) | ||
model_cls = "UserVisit" | ||
event_filters = {"flagged": True} | ||
user = "user" | ||
date_created = "visit_date" | ||
opportunity = "opportunity" | ||
|
||
|
||
INFERRED_EVENT_SPECS = [RecordsFlagged()] | ||
|
||
|
||
def get_events(user=None, from_date=None, to_date=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how is this used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be used in the Events timeline report. |
||
filters = { | ||
"user": user, | ||
"date_created__gte": from_date, | ||
"date_created__lte": to_date, | ||
} | ||
filters = {k: v for k, v in filters.items() if v is not None} | ||
raw_events = Event.objects.filter(**filters).all() | ||
inferred_events = [] | ||
for event_spec in INFERRED_EVENT_SPECS: | ||
inferred_events += list(event_spec.get_events(user=user, from_date=from_date, to_date=to_date)) | ||
return list(raw_events) + inferred_events |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import pickle | ||
from datetime import datetime | ||
|
||
from django.db import transaction | ||
from django_redis import get_redis_connection | ||
|
||
from commcare_connect.events.models import Event | ||
from config import celery_app | ||
|
||
REDIS_EVENTS_QUEUE = "events_queue" | ||
|
||
|
||
class EventQueue: | ||
def __init__(self): | ||
self.redis_conn = get_redis_connection("default") | ||
|
||
def push(self, event_obj): | ||
serialized_event = pickle.dumps(event_obj) | ||
self.redis_conn.rpush(REDIS_EVENTS_QUEUE, serialized_event) | ||
|
||
def pop(self): | ||
events = [pickle.loads(event) for event in self.redis_conn.lrange(REDIS_EVENTS_QUEUE, 0, -1)] | ||
self.redis_conn.ltrim(REDIS_EVENTS_QUEUE, len(events), -1) | ||
return events | ||
|
||
|
||
@celery_app.task | ||
def process_events_batch(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have gone this route over few other options
We can run this task periodically, say every 30 seconds. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for describing your thought process a bit. A few questions
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Main reason being: Wouldn't writing to redis (in memory) be faster than writing to Postgres DB (on disk)? This also allows less number of writes to Postgres (because they are committed in batches) which should make it that much less probable for the DB to run into performance issues. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would rather we start with a purely synchronous implementation for now if we want to be sure about event durability. We can always make these async later, but this adds a lot of complexity for something we aren't sure we need yet. If we do want to make it async, I think we should use celery unless we have a very strong reason not to since we are already using that. That keeps us to only one async task method in our code and makes it a bit easier to manage. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am pretty sure that it would impact performance in sync mode, at least in some cases if not all. For e.g. in (Though, may be this isn't a big issue if these requests are coming from Repeaters as opposed to directly from mobile submissions.) And if we do separate task for each event, we are going to face lot of constant celery troubleshooting. We have like 47 lines of code to do async and a single periodic task. I don't feel that qualifies for 'lot of complexity'. I am happy to remove all the async code or make it multiple individual tasks, but ¯_(ツ)_/, you will have to handle if/when things slow down 🐢 ⌛ 🐌 or if we end up doing constant celery firefighting 🧯 🔥 🚒 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My instinct was that nearly all of these come from background tasks (like approvals) or non time sensitive requests. It is certainly possible that future ones won't (the mobile endpoint should be performant, but we also need to be able to tell mobile whether the write succeeded and we can't do that with an async task anyway), and we can deal with async then, but at this point it seems more important to have guarantees. I am also less convinced of the long term sustainability of this implementation. My understanding of the code is that if any event has an issue, no events will ever be written, based on the error handling there. For example if mobile sends a new event and we havent yet migrated, or a web dev forgets to add the migration. That seems quite dangerous. Additionally its not paged at all, if the list is very long (maybe mobile sends a bunch at once), it could consume a lot of memory. None of these issues are unfixable, but the more we work to make this a production ready queuing system, the more complex it becomes, and the more we benefit from a real one. However, given our current needs, unless there is a clear place where the async will benefit us, it is a bit hard for me to believe that one postgres write will be enough worse than the one redis write at this time and for our use cases. I am not going to block the PR over this, and if you think there is a specific place where that performance matters (the form processor would be reasonable if that were blocking a mobile call), I am certainly willing to consider it, but my strong presumption is that this is overkill for our current needs, while not being robust enough to be a long term solution. |
||
event_queue = EventQueue() | ||
events = event_queue.pop() | ||
if not events: | ||
return | ||
try: | ||
with transaction.atomic(): | ||
Event.objects.bulk_create(events) | ||
except Exception as e: | ||
for event in events: | ||
event_queue.push(event) | ||
raise e | ||
|
||
|
||
def track_event(event_obj, use_async=True): | ||
event_obj.date_created = datetime.utcnow() | ||
if use_async: | ||
EventQueue().push(event_obj) | ||
else: | ||
event_obj.save() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
from rest_framework.test import APIClient | ||
|
||
from commcare_connect.opportunity.models import Opportunity | ||
from commcare_connect.users.models import User | ||
|
||
from .models import Event | ||
from .tasks import EventQueue, process_events_batch | ||
|
||
|
||
def test_post_events(mobile_user_with_connect_link: User, api_client: APIClient, opportunity: Opportunity): | ||
api_client.force_authenticate(mobile_user_with_connect_link) | ||
response = api_client.post( | ||
"/api/events/", | ||
data=[ | ||
{ | ||
"event_type": "invalid_event_name", | ||
"user": mobile_user_with_connect_link.pk, | ||
"opportunity": opportunity.pk, | ||
} | ||
], | ||
format="json", | ||
) | ||
assert response.status_code == 400 | ||
assert Event.objects.count() == 0 | ||
response = api_client.post( | ||
"/api/events/", | ||
data=[ | ||
{ | ||
"event_type": Event.Type.INVITE_SENT, | ||
"user": mobile_user_with_connect_link.pk, | ||
"opportunity": opportunity.pk, | ||
}, | ||
{ | ||
"event_type": Event.Type.RECORDS_APPROVED, | ||
"user": mobile_user_with_connect_link.pk, | ||
"opportunity": opportunity.pk, | ||
}, | ||
], | ||
format="json", | ||
) | ||
assert response.status_code == 201 | ||
assert Event.objects.count() == 2 | ||
|
||
|
||
def test_event_queue(mobile_user_with_connect_link: User, opportunity: Opportunity): | ||
event_queue = EventQueue() | ||
assert event_queue.pop() == [] | ||
|
||
# queue the event | ||
event = Event(event_type=Event.Type.INVITE_SENT, user=mobile_user_with_connect_link, opportunity=opportunity) | ||
event.track() | ||
queued_events = event_queue.pop() | ||
process_events_batch() | ||
assert len(queued_events) == 1 | ||
assert Event.objects.count() == 0 | ||
# process the batch | ||
event.track() | ||
process_events_batch() | ||
assert Event.objects.count() == 1 | ||
assert Event.objects.first().user == event.user |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from django.utils.translation import gettext as _ | ||
|
||
# Server/Web events | ||
INVITE_SENT = "invite_sent" | ||
RECORDS_APPROVED = "records_approved" | ||
RECORDS_REJECTED = "records_rejected" | ||
PAYMENT_APPROVED = "payment_approved" | ||
PAYMENT_ACCRUED = "payment_accrued" | ||
PAYMENT_TRANSFERRED = "payment_transferred" | ||
NOTIFICATIONS_SENT = "notifications_sent" | ||
ADDITIONAL_BUDGET_ADDED = "additional_budget_added" | ||
|
||
EVENT_TYPES = { | ||
INVITE_SENT: _("Invite Sent"), | ||
RECORDS_APPROVED: _("Records Approved"), | ||
RECORDS_REJECTED: _("Records Rejected"), | ||
PAYMENT_APPROVED: _("Payment Approved"), | ||
PAYMENT_ACCRUED: _("Payment Accrued"), | ||
PAYMENT_TRANSFERRED: _("Payment Transferred"), | ||
NOTIFICATIONS_SENT: _("Notifications Sent"), | ||
ADDITIONAL_BUDGET_ADDED: _("Additional Budget Added"), | ||
} | ||
|
||
EVENT_TYPE_CHOICES = list(EVENT_TYPES.items()) | ||
|
||
|
||
# Inferred Events | ||
RECORDS_FLAGGED = "records_flagged" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from django.utils.decorators import method_decorator | ||
from django.views.decorators.csrf import csrf_exempt | ||
from rest_framework import serializers, status | ||
from rest_framework.generics import ListCreateAPIView | ||
from rest_framework.permissions import IsAuthenticated | ||
from rest_framework.response import Response | ||
|
||
from .models import Event | ||
|
||
|
||
class EventSerializer(serializers.ModelSerializer): | ||
class Meta: | ||
model = Event | ||
fields = ["date_created", "event_type", "user", "opportunity"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this will need some kind of event id from the phone, so that we can communicate success info for individual items in the list as described below There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, I was thinking to use the timestamp as such ID, but a UUID would be nice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
|
||
@method_decorator(csrf_exempt, name="dispatch") | ||
class EventListCreateView(ListCreateAPIView): | ||
queryset = Event.objects.all() | ||
serializer_class = EventSerializer | ||
permission_classes = [IsAuthenticated] | ||
|
||
def create(self, request, *args, **kwargs): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if one item passed by the phone is invalid? Does the whole request fail or are the rest created? If the first, I think that could cause issues because then one bad event will make all subsequent requests from the phone fail as it will be included in each one. If the second, I think we need to let the phone know which succeeded and which failed so it knows which records to retry and which it can delete There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a great point, from your above comment maintaining some sort of ID would address this. I will add that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this change is still pending? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of having an ID, I have gone with a simpler approach of just sending back the rows that fail. |
||
if not isinstance(request.data, list): | ||
return Response({"error": "Expected a list of items"}, status=status.HTTP_400_BAD_REQUEST) | ||
|
||
serializer = self.get_serializer(data=request.data, many=True) | ||
serializer.is_valid(raise_exception=True) | ||
|
||
event_objects = [Event(**item) for item in serializer.validated_data] | ||
Event.objects.bulk_create(event_objects) | ||
|
||
headers = self.get_success_headers(serializer.data) | ||
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these events are processed asynchronously, I don't think we can use
auto_now_add
especially since there is data from the phone that could be hours or days delayed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, that's a good point. Though, this field is getting overridden when the events are created. I agree it's better to remove the auto_now_add and let the callers worry about setting it.