Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add event tracking #349

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Empty file.
6 changes: 6 additions & 0 deletions commcare_connect/events/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class EventsAppConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "commcare_connect.events"
59 changes: 59 additions & 0 deletions commcare_connect/events/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Generated by Django 4.2.5 on 2024-07-13 14:45

from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):
initial = True

dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("opportunity", "0044_opportunityverificationflags"),
]

operations = [
migrations.CreateModel(
name="Event",
fields=[
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("date_created", models.DateTimeField(db_index=True)),
(
"event_type",
models.CharField(
choices=[
("invite_sent", "Invite Sent"),
("records_approved", "Records Approved"),
("records_flagged", "Records Flagged"),
("records_rejected", "Records Rejected"),
("payment_approved", "Payment Approved"),
("payment_accrued", "Payment Accrued"),
("payment_transferred", "Payment Transferred"),
("notifications_sent", "Notifications Sent"),
("additional_budget_added", "Additional Budget Added"),
],
max_length=40,
),
),
(
"opportunity",
models.ForeignKey(
null=True, on_delete=django.db.models.deletion.PROTECT, to="opportunity.opportunity"
),
),
(
"user",
models.ForeignKey(
null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL
),
),
(
"organization",
models.ForeignKey(
on_delete=models.CASCADE, related_name="events", related_query_name="event", to="organization.organization"
),
)
],
),
]
Empty file.
157 changes: 157 additions & 0 deletions commcare_connect/events/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from abc import ABCMeta, abstractproperty
from dataclasses import dataclass
from datetime import datetime

from django.db import models
from django.utils.translation import gettext as _

from commcare_connect.organization.models import Organization
from commcare_connect.users.models import User

from . import types


def get_event_type_choices():
# A callable avoids migration getting created
# each time when EVENT_TYPE_CHOICES is edited
return types.EVENT_TYPE_CHOICES


class Event(models.Model):
from commcare_connect.opportunity.models import Opportunity

# this allows referring to event types in this style: Event.Type.INVITE_SENT
Type = types

date_created = models.DateTimeField(db_index=True)
event_type = models.CharField(max_length=40, choices=get_event_type_choices())
user = models.ForeignKey(User, on_delete=models.CASCADE, null=True)
opportunity = models.ForeignKey(Opportunity, on_delete=models.PROTECT, null=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add again that I think a metadata field will be very beneficial. Even with the existing set of events we have, data like which record was approved, or how much the payment was could be very useful, and I know that mobile was also hoping to include additional metadata.

organization = models.ForeignKey(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary? Opportunity encodes this info as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering for cases where Events are org specific without org, but may be we don't have any right now. Let me remove it

Organization,
on_delete=models.CASCADE,
related_name="events",
related_query_name="event",
)

def track(self, use_async=True):
"""
To track an event, instantiate the object and call this method,
instead of calling save directly.

If use_async is True, the event is queued in Redis and saved
via celery, otherwise it's saved directly.
"""
from commcare_connect.events.tasks import track_event

track_event(self, use_async=use_async)


@dataclass
class InferredEvent:
from commcare_connect.opportunity.models import Opportunity

user: User
opportunity: Opportunity
date_created: datetime
event_type: str


class InferredEventSpec(metaclass=ABCMeta):
"""
Use this to define an Event that can be inferred
based on other models. See RecordsFlagged for example
"""

@abstractproperty
def model_cls(self):
"""
The source model class to infer the event from
for e.g. UserVisit
"""
raise NotImplementedError

@abstractproperty
def event_type(self):
"""
Should be a tuple to indicate the name
for e.g. "RECORDS_FLAGGED", gettext("Records Flagged")
"""
raise NotImplementedError

@abstractproperty
def event_filters(self):
"""
Should be a dict of the queryset filters
for e.g. {'flagged': True} for RecordsFlagged for UserVisit
"""
raise NotImplementedError

@abstractproperty
def user(self):
"""
The field corresponding to user on source model.
"""
raise NotImplementedError

@abstractproperty
def date_created(self):
"""
The field corresponding to user date_created.
"""
raise NotImplementedError

@abstractproperty
def opportunity(self):
"""
The field corresponding to user opportunity, could be None.
"""
raise NotImplementedError

def get_events(self, user=None, from_date=None, to_date=None):
filters = {}
filters.update(self.event_filters)

if user:
filters.update({self.user: user})
if from_date:
filters.update({f"{self.date_created}__gte": from_date})
if to_date:
filters.update({f"{self.date_created}__lte": to_date})

events = (
self.model_cls.objects.filter(**filters).values(self.user, self.opportunity, self.date_created).iterator()
)
for event in events:
yield InferredEvent(
event_type=self.event_type[0],
user=event[self.user],
date_created=event[self.date_created],
opportunity=event.get(self.opportunity, None),
)


class RecordsFlagged(InferredEventSpec):
event_type = (types.RECORDS_FLAGGED, _("Records Flagged"))
model_cls = "UserVisit"
event_filters = {"flagged": True}
user = "user"
date_created = "visit_date"
opportunity = "opportunity"


INFERRED_EVENT_SPECS = [RecordsFlagged()]


def get_events(user=None, from_date=None, to_date=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used in the Events timeline report.

filters = {
"user": user,
"date_created__gte": from_date,
"date_created__lte": to_date,
}
filters = {k: v for k, v in filters.items() if v is not None}
raw_events = Event.objects.filter(**filters).all()
inferred_events = []
for event_spec in INFERRED_EVENT_SPECS:
inferred_events += list(event_spec.get_events(user=user, from_date=from_date, to_date=to_date))
return list(raw_events) + inferred_events
47 changes: 47 additions & 0 deletions commcare_connect/events/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import pickle
from datetime import datetime

from django.db import transaction
from django_redis import get_redis_connection

from commcare_connect.events.models import Event
from config import celery_app

REDIS_EVENTS_QUEUE = "events_queue"


class EventQueue:
def __init__(self):
self.redis_conn = get_redis_connection("default")

def push(self, event_obj):
serialized_event = pickle.dumps(event_obj)
self.redis_conn.rpush(REDIS_EVENTS_QUEUE, serialized_event)

def pop(self):
events = [pickle.loads(event) for event in self.redis_conn.lrange(REDIS_EVENTS_QUEUE, 0, -1)]
self.redis_conn.ltrim(REDIS_EVENTS_QUEUE, len(events), -1)
return events


@celery_app.task
def process_events_batch():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have gone this route over few other options

  • Simpler option: single celery task for saving each event would be simpler, but would be too much of a celery overhead and doesn't take advantage of ability to save in bulk which should enable better performance.
  • Complicated option using kafka would enable to do this in much more scalable way, but it would be probably an overkill for now.

We can run this task periodically, say every 30 seconds.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for describing your thought process a bit. A few questions

  1. Why redis instead of postgres for the queue?
  2. Why does this need to be async at all? I can imagine its slightly faster to write to redis than postgres, but if we are doing a network write for every event anyway, I am surprised this is much more efficient than just writing directly to postgres?

Copy link
Member Author

@sravfeyn sravfeyn Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main reason being: Wouldn't writing to redis (in memory) be faster than writing to Postgres DB (on disk)? This also allows less number of writes to Postgres (because they are committed in batches) which should make it that much less probable for the DB to run into performance issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather we start with a purely synchronous implementation for now if we want to be sure about event durability. We can always make these async later, but this adds a lot of complexity for something we aren't sure we need yet. If we do want to make it async, I think we should use celery unless we have a very strong reason not to since we are already using that. That keeps us to only one async task method in our code and makes it a bit easier to manage.

Copy link
Member Author

@sravfeyn sravfeyn Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure that it would impact performance in sync mode, at least in some cases if not all. For e.g. in process_learn_form we need to track multiple events (Finishes a Module, Finishes all Modules, Failed Assessment, Succeeded Assessment). Without async, these will add up enough to slow down the request.

(Though, may be this isn't a big issue if these requests are coming from Repeaters as opposed to directly from mobile submissions.)

And if we do separate task for each event, we are going to face lot of constant celery troubleshooting. We have like 47 lines of code to do async and a single periodic task. I don't feel that qualifies for 'lot of complexity'.

I am happy to remove all the async code or make it multiple individual tasks, but ¯_(ツ)_/, you will have to handle if/when things slow down 🐢 ⌛ 🐌 or if we end up doing constant celery firefighting 🧯 🔥 🚒

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My instinct was that nearly all of these come from background tasks (like approvals) or non time sensitive requests. It is certainly possible that future ones won't (the mobile endpoint should be performant, but we also need to be able to tell mobile whether the write succeeded and we can't do that with an async task anyway), and we can deal with async then, but at this point it seems more important to have guarantees. I am also less convinced of the long term sustainability of this implementation. My understanding of the code is that if any event has an issue, no events will ever be written, based on the error handling there. For example if mobile sends a new event and we havent yet migrated, or a web dev forgets to add the migration. That seems quite dangerous. Additionally its not paged at all, if the list is very long (maybe mobile sends a bunch at once), it could consume a lot of memory.

None of these issues are unfixable, but the more we work to make this a production ready queuing system, the more complex it becomes, and the more we benefit from a real one. However, given our current needs, unless there is a clear place where the async will benefit us, it is a bit hard for me to believe that one postgres write will be enough worse than the one redis write at this time and for our use cases.

I am not going to block the PR over this, and if you think there is a specific place where that performance matters (the form processor would be reasonable if that were blocking a mobile call), I am certainly willing to consider it, but my strong presumption is that this is overkill for our current needs, while not being robust enough to be a long term solution.

event_queue = EventQueue()
events = event_queue.pop()
if not events:
return
try:
with transaction.atomic():
Event.objects.bulk_create(events)
except Exception as e:
for event in events:
event_queue.push(event)
raise e


def track_event(event_obj, use_async=True):
event_obj.date_created = datetime.utcnow()
if use_async:
EventQueue().push(event_obj)
else:
event_obj.save()
60 changes: 60 additions & 0 deletions commcare_connect/events/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from rest_framework.test import APIClient

from commcare_connect.opportunity.models import Opportunity
from commcare_connect.users.models import User

from .models import Event
from .tasks import EventQueue, process_events_batch


def test_post_events(mobile_user_with_connect_link: User, api_client: APIClient, opportunity: Opportunity):
api_client.force_authenticate(mobile_user_with_connect_link)
response = api_client.post(
"/api/events/",
data=[
{
"event_type": "invalid_event_name",
"user": mobile_user_with_connect_link.pk,
"opportunity": opportunity.pk,
}
],
format="json",
)
assert response.status_code == 400
assert Event.objects.count() == 0
response = api_client.post(
"/api/events/",
data=[
{
"event_type": Event.Type.INVITE_SENT,
"user": mobile_user_with_connect_link.pk,
"opportunity": opportunity.pk,
},
{
"event_type": Event.Type.RECORDS_APPROVED,
"user": mobile_user_with_connect_link.pk,
"opportunity": opportunity.pk,
},
],
format="json",
)
assert response.status_code == 201
assert Event.objects.count() == 2


def test_event_queue(mobile_user_with_connect_link: User, opportunity: Opportunity):
event_queue = EventQueue()
assert event_queue.pop() == []

# queue the event
event = Event(event_type=Event.Type.INVITE_SENT, user=mobile_user_with_connect_link, opportunity=opportunity)
event.track()
queued_events = event_queue.pop()
process_events_batch()
assert len(queued_events) == 1
assert Event.objects.count() == 0
# process the batch
event.track()
process_events_batch()
assert Event.objects.count() == 1
assert Event.objects.first().user == event.user
29 changes: 29 additions & 0 deletions commcare_connect/events/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from django.utils.translation import gettext as _

# Server/Web events
INVITE_SENT = "invite_sent"
RECORDS_APPROVED = "records_approved"
RECORDS_REJECTED = "records_rejected"
PAYMENT_APPROVED = "payment_approved"
PAYMENT_ACCRUED = "payment_accrued"
PAYMENT_TRANSFERRED = "payment_transferred"
NOTIFICATIONS_SENT = "notifications_sent"
ADDITIONAL_BUDGET_ADDED = "additional_budget_added"

EVENT_TYPES = {
INVITE_SENT: _("Invite Sent"),
RECORDS_APPROVED: _("Records Approved"),
RECORDS_REJECTED: _("Records Rejected"),
PAYMENT_APPROVED: _("Payment Approved"),
PAYMENT_ACCRUED: _("Payment Accrued"),
PAYMENT_TRANSFERRED: _("Payment Transferred"),
NOTIFICATIONS_SENT: _("Notifications Sent"),
ADDITIONAL_BUDGET_ADDED: _("Additional Budget Added"),
}


EVENT_TYPE_CHOICES = list(EVENT_TYPES.items())


# Inferred Events
RECORDS_FLAGGED = "records_flagged"
9 changes: 9 additions & 0 deletions commcare_connect/events/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from django.urls import path

from .views import EventListView

app_name = "events"

urlpatterns = [
path("", view=EventListView.as_view(), name="event_htmx"),
]
Loading
Loading