Skip to content

Commit

Permalink
add event tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
sravfeyn committed Jul 1, 2024
1 parent 84d35d0 commit ec8f001
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 0 deletions.
Empty file.
151 changes: 151 additions & 0 deletions commcare_connect/events/models..py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from django.db import models

from commcare_connect.users.models import User


class Event(models.Model):

class Type(models.TextChoices):
INVITE_SENT = "is", gettext("Invite Sent")
RECORDS_APPROVED = "rp", gettext("Records Approved")
RECORDS_FLAGGED = "rf", gettext("Records Flagged")
RECORDS_REJECTED = "rj", gettext("Records Rejected")
PAYMENT_APPROVED = "pp", gettext("Payment Approved")
PAYMENT_ACCRUED = "pa", gettext("Payment Accrued")
PAYMENT_TRANSFERRED = "pf", gettext("Payment Transferred")
NOTIFICATIONS_SENT = "nf", gettext("Notification Sent")
ADDITIONAL_BUDGET_ADDED = "ab", gettext("Additional Budget Added")

date_created = models.DateTimeField(auto_now_add=True, db_index=True)
event_type = models.CharField(max_length='2', choices=Type.choices)
user = models.ForeignKey(User, on_delete=models.CASCADE, null=True)
opportunity = models.ForeignKey(Opportunity, on_delete=models.PROTECT, null=True)

@classmethod
def track(cls, use_async=True):
"""
To track an event instantiate the object and call this method,
instead of calling save directly.
If use_async is True, the event is queued in Redis and saved
via celery, otherwise it's saved directly.
"""
from commcare_connect.events.tasks import track_event
track_event(self, use_async=use_async)



@dataclass
class InferredEvent:
user: User
opportunity: Opportunity
date_created: datetime
event_type: str


class InferredEventSpec(ABCMeta):
"""
Use this to define an Event that can be inferred
based on other models.
"""

@abstractproperty
def model_cls(self):
"""
The source model class to infer the event from
for e.g. UserVisit
"""
raise NotImplementedError

@abstractproperty
def event_type(self):
"""
Should be a tuple to indicate the name
for e.g. "RECORDS_FLAGGED", gettext("Records Flagged")
"""
raise NotImplementedError

@abstractproperty
def event_filters(self):
"""
Should be a dict of the queryset filters
for e.g. {'flagged': True} for RecordsFlagged for UserVisit
"""
raise NotImplementedError


@abstractproperty
def user(self):
"""
The field corresponding to user on source model.
"""
raise NotImplementedError

@abstractproperty
def date_created(self):
"""
The field corresponding to user date_created.
"""
raise NotImplementedError

@abstractproperty
def opportunity(self):
"""
The field corresponding to user opportunity, could be None.
"""
raise NotImplementedError

def get_events(self, user=None, from_date=None, to_date=None):
filters = {}
filters.update(event_filters)

if user:
filters.update({self.user: user})
if from_date:
filters.update({f"{self.date_created}__gte": from_date})
if from_date:
filters.update({f"{self.date_created}__lte": to_date})

events = self.model_cls.objects.filter(
**filters
).values(
self.user, self.opportunity, self.date_created
).iterator()
for event in events:
yield InferredEvent(
event_type=self.event_type[0],
user=event[self.user],
date_created=event[self.date_created],
opportunity=event.get(self.opportunity, None)
)


class RecordsFlagged(InferredEventSpec):

event_type = "RECORDS_FLAGGED", gettext("Records Flagged")
model = UserVisit
event_filters = {"flagged": True}
field_mapping = {
"user": "user",
"date_created": "visit_date",
"opportunity": "opportunity",
}


INFERRED_EVENT_SPECS = [
RecordsFlagged
]


def get_events(user=None, from_date=None, to_date=None):
filters = {
"user": user,
"date_created__gte": from_date,
"date_created__lte": to_date,
}
filters = {k:v for k,v in filters.items() if v is not None}
raw_events = Events.objects.filter(**filters).all()
inferred_events = []
for event_spec in INFERRED_EVENT_SPECS:
inferred_events += event_spec.get_events()
return raw_events + inferred_events
37 changes: 37 additions & 0 deletions commcare_connect/events/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import pickle
from datetime import datetime

from django.db import transaction
from django_redis import get_redis_connection

from config import celery_app

from .models import Event

REDIS_EVENTS_QUEUE = "events_queue"


@celery_app.task
def process_events_batch():
redis_conn = get_redis_connection("default")
events = redis_conn.lrange(REDIS_EVENTS_QUEUE, 0, -1)
if not events:
return

with transaction.atomic():
event_objs = []
for event in events:
event_objs.append(pickle.loads(event))
Event.objects.bulk_create(event_objs)

redis_conn.ltrim(REDIS_EVENTS_QUEUE, len(events), -1)


def track_event(event_obj, use_async=True):
event_obj.date_created = datetime.now()
if use_async:
redis_conn = get_redis_connection("default")
serialized_event = pickle.dumps(event_obj)
redis_conn.rpush(REDIS_EVENTS_QUEUE, serialized_event)
else:
event_obj.save()
4 changes: 4 additions & 0 deletions commcare_connect/opportunity/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from commcare_connect.connect_id_client import fetch_users, send_message, send_message_bulk
from commcare_connect.connect_id_client.models import Message
from commcare_connect.events.models import Event
from commcare_connect.opportunity.app_xml import get_connect_blocks_for_app, get_deliver_units_for_app
from commcare_connect.opportunity.export import (
export_deliver_status_table,
Expand Down Expand Up @@ -101,6 +102,9 @@ def invite_user(user_id, opportunity_access_id):
),
)
send_message(message)
Event(event_type=Event.Type.INVITE_SENT, user=user, opportunity=opportunity_access.opportunity).track(
use_async=False
)


@celery_app.task()
Expand Down
4 changes: 4 additions & 0 deletions commcare_connect/opportunity/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from django_tables2.export import TableExport
from geopy import distance

from commcare_connect.events.models import Event
from commcare_connect.form_receiver.serializers import XFormSerializer
from commcare_connect.opportunity.forms import (
AddBudgetExistingUsersForm,
Expand Down Expand Up @@ -382,6 +383,7 @@ def add_budget_existing_users(request, org_slug=None, pk=None):
opportunity.total_budget += ocl.payment_unit.amount * additional_visits
opportunity.save()
return redirect("opportunity:detail", org_slug, pk)
Event(event_type=Event.Type.ADDITIONAL_BUDGET_ADDED, opportunity=opportunity).track()

return render(
request,
Expand Down Expand Up @@ -750,6 +752,7 @@ def approve_visit(request, org_slug=None, pk=None):
opp_id = user_visit.opportunity_id
access = OpportunityAccess.objects.get(user_id=user_visit.user_id, opportunity_id=opp_id)
update_payment_accrued(opportunity=access.opportunity, users=[access.user])
Event(event_type=Event.Type.RECORDS_APPROVED, user=user_visit.user, opportunity=access.opportunity).track()
return redirect("opportunity:user_visits_list", org_slug=org_slug, opp_id=user_visit.opportunity.id, pk=access.id)


Expand All @@ -760,6 +763,7 @@ def reject_visit(request, org_slug=None, pk=None):
user_visit.save()
access = OpportunityAccess.objects.get(user_id=user_visit.user_id, opportunity_id=user_visit.opportunity_id)
update_payment_accrued(opportunity=access.opportunity, users=[access.user])
Event(event_type=Event.Type.RECORDS_REJECTED, user=user_visit.user, opportunity=access.opportunity).track()
return redirect("opportunity:user_visits_list", org_slug=org_slug, opp_id=user_visit.opportunity_id, pk=access.id)


Expand Down
3 changes: 3 additions & 0 deletions commcare_connect/opportunity/visit_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from django.db import transaction
from tablib import Dataset

from commcare_connect.events.models import Event
from commcare_connect.opportunity.models import (
CompletedWork,
CompletedWorkStatus,
Expand Down Expand Up @@ -143,6 +144,7 @@ def update_payment_accrued(opportunity: Opportunity, users):
access.payment_accrued += approved_count * completed_work.payment_unit.amount
completed_work.save()
access.save()
Event(event_type=Event.Type.PAYMENT_ACCRUED, user=access.user, opportunity=access.opportunity).track()


def get_status_by_visit_id(dataset) -> dict[int, VisitValidationStatus]:
Expand Down Expand Up @@ -238,6 +240,7 @@ def _bulk_update_payments(opportunity: Opportunity, imported_data: Dataset) -> P
payment = Payment.objects.create(opportunity_access=access, amount=amount)
seen_users.add(username)
payment_ids.append(payment.pk)
Event(event_type=Event.Type.PAYMENT_TRANSFERRED, user=access.user, opportunity=opportunity).track()
missing_users = set(usernames) - seen_users
send_payment_notification.delay(opportunity.id, payment_ids)
return PaymentImportStatus(seen_users, missing_users)
Expand Down
1 change: 1 addition & 0 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

LOCAL_APPS = [
"commcare_connect.commcarehq_provider",
"commcare_connect.events",
"commcare_connect.form_receiver",
"commcare_connect.opportunity",
"commcare_connect.organization",
Expand Down

0 comments on commit ec8f001

Please sign in to comment.