Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add event tracking #349

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Empty file.
6 changes: 6 additions & 0 deletions commcare_connect/events/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class EventsAppConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "commcare_connect.events"
53 changes: 53 additions & 0 deletions commcare_connect/events/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Generated by Django 4.2.5 on 2024-07-13 14:45

from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):
initial = True

dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("opportunity", "0044_opportunityverificationflags"),
]

operations = [
migrations.CreateModel(
name="Event",
fields=[
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("date_created", models.DateTimeField(auto_now_add=True, db_index=True)),
(
"event_type",
models.CharField(
choices=[
("invite_sent", "Invite Sent"),
("records_approved", "Records Approved"),
("records_flagged", "Records Flagged"),
("records_rejected", "Records Rejected"),
("payment_approved", "Payment Approved"),
("payment_accrued", "Payment Accrued"),
("payment_transferred", "Payment Transferred"),
("notifications_sent", "Notifications Sent"),
("additional_budget_added", "Additional Budget Added"),
],
max_length=40,
),
),
(
"opportunity",
models.ForeignKey(
null=True, on_delete=django.db.models.deletion.PROTECT, to="opportunity.opportunity"
),
),
(
"user",
models.ForeignKey(
null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL
),
),
],
),
]
Empty file.
144 changes: 144 additions & 0 deletions commcare_connect/events/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from abc import ABCMeta, abstractproperty
from dataclasses import dataclass
from datetime import datetime

from django.db import models
from django.utils.translation import gettext as _

from commcare_connect.users.models import User

from . import types


class Event(models.Model):
from commcare_connect.opportunity.models import Opportunity

# this allows referring to event types in this style: Event.Type.INVITE_SENT
Type = types

date_created = models.DateTimeField(auto_now_add=True, db_index=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these events are processed asynchronously, I don't think we can use auto_now_add especially since there is data from the phone that could be hours or days delayed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's a good point. Though, this field is getting overridden when the events are created. I agree it's better to remove the auto_now_add and let the callers worry about setting it.

event_type = models.CharField(max_length=40, choices=types.EVENT_TYPE_CHOICES)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mentioned you were going to rethink this in response to my last comment but appears to still be here? How has your thinking here evolved? Why did you decide to require a migration each time we have a new event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am going to make it a callable to avoid migrations getting created

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it looks like this won't work since we are on Django 4 (and callable only works in 5), we can do it using validators. But the trade off is that we need to have some extra code to toggle the slug and verbose_name (and back) while processing in some of the places (which is ugly and unnecessary). I could live with extra migration until Django 5 instead of the unnecessary code. Do you feel strongly against migrations?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you feel strongly against migrations?

I am not sure I totally understand why we need restricted choices here at all. That seems pretty nonstandard for an analytics system (if you think of GA or kissmetrics, they allow arbitrary events), and makes adding new events, whcih we expect to do incrementally much trickier. Mobile will need to coordinate releases around web running these migrations, and simple communication errors could cause confusing problems. I like the idea on web of predefining our events, and only using ones we have listed in something like a constant file, but I am not sure I am sold on enforcing them at the db level.

As with other comments, I am open to being convinced

user = models.ForeignKey(User, on_delete=models.CASCADE, null=True)
opportunity = models.ForeignKey(Opportunity, on_delete=models.PROTECT, null=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add again that I think a metadata field will be very beneficial. Even with the existing set of events we have, data like which record was approved, or how much the payment was could be very useful, and I know that mobile was also hoping to include additional metadata.


def track(self, use_async=True):
"""
To track an event, instantiate the object and call this method,
instead of calling save directly.

If use_async is True, the event is queued in Redis and saved
via celery, otherwise it's saved directly.
"""
from commcare_connect.events.tasks import track_event

track_event(self, use_async=use_async)


@dataclass
class InferredEvent:
from commcare_connect.opportunity.models import Opportunity

user: User
opportunity: Opportunity
date_created: datetime
event_type: str


class InferredEventSpec(metaclass=ABCMeta):
"""
Use this to define an Event that can be inferred
based on other models. See RecordsFlagged for example
"""

@abstractproperty
def model_cls(self):
"""
The source model class to infer the event from
for e.g. UserVisit
"""
raise NotImplementedError

@abstractproperty
def event_type(self):
"""
Should be a tuple to indicate the name
for e.g. "RECORDS_FLAGGED", gettext("Records Flagged")
"""
raise NotImplementedError

@abstractproperty
def event_filters(self):
"""
Should be a dict of the queryset filters
for e.g. {'flagged': True} for RecordsFlagged for UserVisit
"""
raise NotImplementedError

@abstractproperty
def user(self):
"""
The field corresponding to user on source model.
"""
raise NotImplementedError

@abstractproperty
def date_created(self):
"""
The field corresponding to user date_created.
"""
raise NotImplementedError

@abstractproperty
def opportunity(self):
"""
The field corresponding to user opportunity, could be None.
"""
raise NotImplementedError

def get_events(self, user=None, from_date=None, to_date=None):
filters = {}
filters.update(self.event_filters)

if user:
filters.update({self.user: user})
if from_date:
filters.update({f"{self.date_created}__gte": from_date})
if to_date:
filters.update({f"{self.date_created}__lte": to_date})

events = (
self.model_cls.objects.filter(**filters).values(self.user, self.opportunity, self.date_created).iterator()
)
for event in events:
yield InferredEvent(
event_type=self.event_type[0],
user=event[self.user],
date_created=event[self.date_created],
opportunity=event.get(self.opportunity, None),
)


class RecordsFlagged(InferredEventSpec):
event_type = (types.RECORDS_FLAGGED, _("Records Flagged"))
model_cls = "UserVisit"
event_filters = {"flagged": True}
user = "user"
date_created = "visit_date"
opportunity = "opportunity"


INFERRED_EVENT_SPECS = [RecordsFlagged()]


def get_events(user=None, from_date=None, to_date=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used in the Events timeline report.

filters = {
"user": user,
"date_created__gte": from_date,
"date_created__lte": to_date,
}
filters = {k: v for k, v in filters.items() if v is not None}
raw_events = Event.objects.filter(**filters).all()
inferred_events = []
for event_spec in INFERRED_EVENT_SPECS:
inferred_events += list(event_spec.get_events(user=user, from_date=from_date, to_date=to_date))
return list(raw_events) + inferred_events
47 changes: 47 additions & 0 deletions commcare_connect/events/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import pickle
from datetime import datetime

from django.db import transaction
from django_redis import get_redis_connection

from commcare_connect.events.models import Event
from config import celery_app

REDIS_EVENTS_QUEUE = "events_queue"


class EventQueue:
def __init__(self):
self.redis_conn = get_redis_connection("default")

def push(self, event_obj):
serialized_event = pickle.dumps(event_obj)
self.redis_conn.rpush(REDIS_EVENTS_QUEUE, serialized_event)

def pop(self):
events = [pickle.loads(event) for event in self.redis_conn.lrange(REDIS_EVENTS_QUEUE, 0, -1)]
self.redis_conn.ltrim(REDIS_EVENTS_QUEUE, len(events), -1)
return events


@celery_app.task
def process_events_batch():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have gone this route over few other options

  • Simpler option: single celery task for saving each event would be simpler, but would be too much of a celery overhead and doesn't take advantage of ability to save in bulk which should enable better performance.
  • Complicated option using kafka would enable to do this in much more scalable way, but it would be probably an overkill for now.

We can run this task periodically, say every 30 seconds.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for describing your thought process a bit. A few questions

  1. Why redis instead of postgres for the queue?
  2. Why does this need to be async at all? I can imagine its slightly faster to write to redis than postgres, but if we are doing a network write for every event anyway, I am surprised this is much more efficient than just writing directly to postgres?

Copy link
Member Author

@sravfeyn sravfeyn Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main reason being: Wouldn't writing to redis (in memory) be faster than writing to Postgres DB (on disk)? This also allows less number of writes to Postgres (because they are committed in batches) which should make it that much less probable for the DB to run into performance issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather we start with a purely synchronous implementation for now if we want to be sure about event durability. We can always make these async later, but this adds a lot of complexity for something we aren't sure we need yet. If we do want to make it async, I think we should use celery unless we have a very strong reason not to since we are already using that. That keeps us to only one async task method in our code and makes it a bit easier to manage.

Copy link
Member Author

@sravfeyn sravfeyn Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure that it would impact performance in sync mode, at least in some cases if not all. For e.g. in process_learn_form we need to track multiple events (Finishes a Module, Finishes all Modules, Failed Assessment, Succeeded Assessment). Without async, these will add up enough to slow down the request.

(Though, may be this isn't a big issue if these requests are coming from Repeaters as opposed to directly from mobile submissions.)

And if we do separate task for each event, we are going to face lot of constant celery troubleshooting. We have like 47 lines of code to do async and a single periodic task. I don't feel that qualifies for 'lot of complexity'.

I am happy to remove all the async code or make it multiple individual tasks, but ¯_(ツ)_/, you will have to handle if/when things slow down 🐢 ⌛ 🐌 or if we end up doing constant celery firefighting 🧯 🔥 🚒

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My instinct was that nearly all of these come from background tasks (like approvals) or non time sensitive requests. It is certainly possible that future ones won't (the mobile endpoint should be performant, but we also need to be able to tell mobile whether the write succeeded and we can't do that with an async task anyway), and we can deal with async then, but at this point it seems more important to have guarantees. I am also less convinced of the long term sustainability of this implementation. My understanding of the code is that if any event has an issue, no events will ever be written, based on the error handling there. For example if mobile sends a new event and we havent yet migrated, or a web dev forgets to add the migration. That seems quite dangerous. Additionally its not paged at all, if the list is very long (maybe mobile sends a bunch at once), it could consume a lot of memory.

None of these issues are unfixable, but the more we work to make this a production ready queuing system, the more complex it becomes, and the more we benefit from a real one. However, given our current needs, unless there is a clear place where the async will benefit us, it is a bit hard for me to believe that one postgres write will be enough worse than the one redis write at this time and for our use cases.

I am not going to block the PR over this, and if you think there is a specific place where that performance matters (the form processor would be reasonable if that were blocking a mobile call), I am certainly willing to consider it, but my strong presumption is that this is overkill for our current needs, while not being robust enough to be a long term solution.

event_queue = EventQueue()
events = event_queue.pop()
if not events:
return
try:
with transaction.atomic():
Event.objects.bulk_create(events)
except Exception as e:
for event in events:
event_queue.push(event)
raise e


def track_event(event_obj, use_async=True):
event_obj.date_created = datetime.utcnow()
if use_async:
EventQueue().push(event_obj)
else:
event_obj.save()
60 changes: 60 additions & 0 deletions commcare_connect/events/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from rest_framework.test import APIClient

from commcare_connect.opportunity.models import Opportunity
from commcare_connect.users.models import User

from .models import Event
from .tasks import EventQueue, process_events_batch


def test_post_events(mobile_user_with_connect_link: User, api_client: APIClient, opportunity: Opportunity):
api_client.force_authenticate(mobile_user_with_connect_link)
response = api_client.post(
"/api/events/",
data=[
{
"event_type": "invalid_event_name",
"user": mobile_user_with_connect_link.pk,
"opportunity": opportunity.pk,
}
],
format="json",
)
assert response.status_code == 400
assert Event.objects.count() == 0
response = api_client.post(
"/api/events/",
data=[
{
"event_type": Event.Type.INVITE_SENT,
"user": mobile_user_with_connect_link.pk,
"opportunity": opportunity.pk,
},
{
"event_type": Event.Type.RECORDS_APPROVED,
"user": mobile_user_with_connect_link.pk,
"opportunity": opportunity.pk,
},
],
format="json",
)
assert response.status_code == 201
assert Event.objects.count() == 2


def test_event_queue(mobile_user_with_connect_link: User, opportunity: Opportunity):
event_queue = EventQueue()
assert event_queue.pop() == []

# queue the event
event = Event(event_type=Event.Type.INVITE_SENT, user=mobile_user_with_connect_link, opportunity=opportunity)
event.track()
queued_events = event_queue.pop()
process_events_batch()
assert len(queued_events) == 1
assert Event.objects.count() == 0
# process the batch
event.track()
process_events_batch()
assert Event.objects.count() == 1
assert Event.objects.first().user == event.user
28 changes: 28 additions & 0 deletions commcare_connect/events/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from django.utils.translation import gettext as _

# Server/Web events
INVITE_SENT = "invite_sent"
RECORDS_APPROVED = "records_approved"
RECORDS_REJECTED = "records_rejected"
PAYMENT_APPROVED = "payment_approved"
PAYMENT_ACCRUED = "payment_accrued"
PAYMENT_TRANSFERRED = "payment_transferred"
NOTIFICATIONS_SENT = "notifications_sent"
ADDITIONAL_BUDGET_ADDED = "additional_budget_added"

EVENT_TYPES = {
INVITE_SENT: _("Invite Sent"),
RECORDS_APPROVED: _("Records Approved"),
RECORDS_REJECTED: _("Records Rejected"),
PAYMENT_APPROVED: _("Payment Approved"),
PAYMENT_ACCRUED: _("Payment Accrued"),
PAYMENT_TRANSFERRED: _("Payment Transferred"),
NOTIFICATIONS_SENT: _("Notifications Sent"),
ADDITIONAL_BUDGET_ADDED: _("Additional Budget Added"),
}

EVENT_TYPE_CHOICES = list(EVENT_TYPES.items())


# Inferred Events
RECORDS_FLAGGED = "records_flagged"
34 changes: 34 additions & 0 deletions commcare_connect/events/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from rest_framework import serializers, status
from rest_framework.generics import ListCreateAPIView
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from .models import Event


class EventSerializer(serializers.ModelSerializer):
class Meta:
model = Event
fields = ["date_created", "event_type", "user", "opportunity"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will need some kind of event id from the phone, so that we can communicate success info for individual items in the list as described below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I was thinking to use the timestamp as such ID, but a UUID would be nice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



@method_decorator(csrf_exempt, name="dispatch")
class EventListCreateView(ListCreateAPIView):
queryset = Event.objects.all()
serializer_class = EventSerializer
permission_classes = [IsAuthenticated]

def create(self, request, *args, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if one item passed by the phone is invalid? Does the whole request fail or are the rest created? If the first, I think that could cause issues because then one bad event will make all subsequent requests from the phone fail as it will be included in each one. If the second, I think we need to let the phone know which succeeded and which failed so it knows which records to retry and which it can delete

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great point, from your above comment maintaining some sort of ID would address this. I will add that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this change is still pending?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having an ID, I have gone with a simpler approach of just sending back the rows that fail.

if not isinstance(request.data, list):
return Response({"error": "Expected a list of items"}, status=status.HTTP_400_BAD_REQUEST)

serializer = self.get_serializer(data=request.data, many=True)
serializer.is_valid(raise_exception=True)

event_objects = [Event(**item) for item in serializer.validated_data]
Event.objects.bulk_create(event_objects)

headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
6 changes: 6 additions & 0 deletions commcare_connect/opportunity/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ def invite_user(user_id, opportunity_access_id):
),
)
send_message(message)
from commcare_connect.events.models import Event

Event(event_type=Event.Type.INVITE_SENT, user=user, opportunity=opportunity_access.opportunity).track(
# this is already in async worker, so user_async is False
use_async=False
)


@celery_app.task()
Expand Down
Loading
Loading