Skip to content
This repository has been archived by the owner on Feb 8, 2018. It is now read-only.

Commit

Permalink
Save email messages forever
Browse files Browse the repository at this point in the history
  • Loading branch information
chadwhitacre committed Aug 15, 2017
1 parent 4d50c3f commit 228d07c
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 90 deletions.
75 changes: 36 additions & 39 deletions gratipay/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from jinja2 import Environment
from markupsafe import escape as htmlescape

from gratipay.exceptions import Throttled
from gratipay.exceptions import NoEmailAddress, Throttled
from gratipay.models.participant import Participant
from gratipay.utils import find_files, i18n

Expand All @@ -38,6 +38,7 @@ def __init__(self, env, db, tell_sentry, root):
self.tell_sentry = tell_sentry
self.sleep_for = env.email_queue_sleep_for
self.allow_up_to = env.email_queue_allow_up_to
self.log_every = env.email_queue_log_metrics_every

templates = {}
templates_dir = os.path.join(root, 'emails')
Expand Down Expand Up @@ -75,13 +76,18 @@ def put(self, to, template, _user_initiated=True, **context):
"""
with self.db.get_cursor() as cursor:
cursor.run("""
INSERT INTO email_queue
INSERT INTO email_messages
(participant, spt_name, context, user_initiated)
VALUES (%s, %s, %s, %s)
""", (to.id, template, pickle.dumps(context), _user_initiated))
if _user_initiated:
n = cursor.one('SELECT count(*) FROM email_queue '
'WHERE participant=%s AND user_initiated', (to.id,))
n = cursor.one("""
SELECT count(*)
FROM email_messages
WHERE participant=%s
AND result is null
AND user_initiated
""", (to.id,))
if n > self.allow_up_to:
raise Throttled()

Expand All @@ -91,9 +97,9 @@ def flush(self):
"""
fetch_messages = lambda: self.db.all("""
SELECT *
FROM email_queue
WHERE not dead
ORDER BY id ASC
FROM email_messages
WHERE result is null
ORDER BY ctime ASC
LIMIT 60
""")
nsent = 0
Expand All @@ -103,38 +109,28 @@ def flush(self):
break
for rec in messages:
try:
r = self._flush_one(rec)
except:
self.db.run("UPDATE email_queue SET dead=true WHERE id = %s", (rec.id,))
raise
self.db.run("DELETE FROM email_queue WHERE id = %s", (rec.id,))
if r == 1:
sleep(self.sleep_for)
nsent += r
message = self._prepare_email_message_for_ses(rec)
self._mailer.send_email(**message)
except Exception as exc:
self._store_result(rec.id, repr(exc))
raise # we want to see this in Sentry
self._store_result(rec.id, '')
nsent += 1
sleep(self.sleep_for)
return nsent


def _flush_one(self, rec):
"""Send an email message using the underlying ``_mailer``.
:param Record rec: a database record from the ``email_queue`` table
:return int: the number of emails sent (0 or 1)
"""
message = self._prepare_email_message_for_ses(rec)
if message is None:
return 0 # Not sent
self._mailer.send_email(**message)
return 1 # Sent
def _store_result(self, message_id, result):
self.db.run("UPDATE email_messages SET result=%s WHERE id=%s", (result, message_id))


def _prepare_email_message_for_ses(self, rec):
"""Prepare an email message for delivery via Amazon SES.
:param Record rec: a database record from the ``email_queue`` table
:param Record rec: a database record from the ``email_messages`` table
:returns: ``None`` if we can't find an email address to send to
:returns: ``dict`` if we can find an email address to send to
:raises: ``NoEmailAddress`` if we can't find an email address to send to
We look for an email address to send to in two places:
Expand All @@ -156,7 +152,7 @@ def _prepare_email_message_for_ses(self, rec):
context.setdefault('include_unsubscribe', True)
email = context.setdefault('email', to.email_address)
if not email:
return None
raise NoEmailAddress()
langs = i18n.parse_accept_lang(to.email_lang or 'en')
locale = i18n.match_lang(langs)
i18n.add_helpers_to_context(self.tell_sentry, context, locale)
Expand Down Expand Up @@ -188,15 +184,16 @@ def render(t, context):


def log_metrics(self, _print=print):
ndead = self.db.one('SELECT COUNT(*) FROM email_queue WHERE dead')
ntotal = self.db.one('SELECT COUNT(*) FROM email_queue')
_print('count#email_queue_dead=%d count#email_queue_total=%d' % (ndead, ntotal))


def purge(self):
"""Remove all messages from the queue.
"""
self.db.run('DELETE FROM email_queue')
stats = self.db.one("""
SELECT count(CASE WHEN result = '' THEN 1 END) AS sent
, count(CASE WHEN result > '' THEN 1 END) AS failed
, count(CASE WHEN result is null THEN 1 END) AS pending
FROM email_messages
WHERE ctime > now() - %s::interval
""", ('{} seconds'.format(self.log_every),), back_as=dict)
prefix = 'count#email_queue'
variables = ('sent', 'failed', 'pending')
_print(' '.join('{}_{}={}'.format(prefix, v, stats[v]) for v in variables))


jinja_env = Environment()
Expand Down
3 changes: 3 additions & 0 deletions gratipay/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ def lazy_body(self, _):
return _("You've reached the maximum number of email addresses we allow.")


class NoEmailAddress(Exception):
pass

class Throttled(LocalizedErrorResponse):
def lazy_body(self, _):
return _("You've initiated too many emails too quickly. Please try again in a minute or two.")
Expand Down
2 changes: 1 addition & 1 deletion gratipay/models/package/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def classify_emails_for_participant(self, participant):
other_verified = self.db.all('''
SELECT address
FROM emails
FROM email_addresses
WHERE verified is true
AND participant_id != %s
AND address = ANY((SELECT emails FROM packages WHERE id=%s)::text[])
Expand Down
12 changes: 6 additions & 6 deletions gratipay/models/participant/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def clear_personal_information(self, cursor):
AND is_member IS true
);
DELETE FROM emails WHERE participant_id = %(participant_id)s;
DELETE FROM email_addresses WHERE participant_id = %(participant_id)s;
DELETE FROM statements WHERE participant=%(participant_id)s;
DELETE FROM participant_identities WHERE participant_id=%(participant_id)s;
Expand Down Expand Up @@ -1104,17 +1104,17 @@ def take_over(self, account, have_confirmation=False):

MERGE_EMAIL_ADDRESSES = """
WITH emails_to_keep AS (
WITH email_addresses_to_keep AS (
SELECT DISTINCT ON (address) id
FROM emails
FROM email_addresses
WHERE participant_id IN (%(dead)s, %(live)s)
ORDER BY address, verification_end, verification_start DESC
)
DELETE FROM emails
DELETE FROM email_addresses
WHERE participant_id IN (%(dead)s, %(live)s)
AND id NOT IN (SELECT id FROM emails_to_keep);
AND id NOT IN (SELECT id FROM email_addresses_to_keep);
UPDATE emails
UPDATE email_addresses
SET participant_id = %(live)s
WHERE participant_id = %(dead)s;
Expand Down
25 changes: 13 additions & 12 deletions gratipay/models/participant/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ def __repr__(self):
class Email(object):
"""Participants may associate email addresses with their account.
Email addresses are stored in an ``emails`` table in the database, which
holds the addresses themselves as well as info related to address
Email addresses are stored in an ``email_addresses`` table in the database,
which holds the addresses themselves as well as info related to address
verification. While a participant may have multiple email addresses on
file, verified or not, only one will be the *primary* email address: the
one also recorded in ``participants.email_address``. It's a bug for the
primary address not to be verified, or for an address to be in
``participants.email_address`` but not also in ``emails``.
``participants.email_address`` but not also in ``email_addresses``.
Having a verified email is a prerequisite for certain other features on
Gratipay, such as linking a PayPal account, or filing a national identity.
Expand Down Expand Up @@ -108,7 +108,7 @@ def validate_email_verification_request(self, c, email, *packages):

owner_id = c.one("""
SELECT participant_id
FROM emails
FROM email_addresses
WHERE address = %(email)s
AND verified IS true
""", dict(email=email))
Expand Down Expand Up @@ -160,7 +160,7 @@ def get_email_verification_nonce(self, c, email):
"""Given a cursor and email address, return a verification nonce.
"""
nonce = str(uuid.uuid4())
existing = c.one( 'SELECT * FROM emails WHERE address=%s AND participant_id=%s'
existing = c.one( 'SELECT * FROM email_addresses WHERE address=%s AND participant_id=%s'
, (email, self.id)
) # can't use eafp here because of cursor error handling
# XXX I forget what eafp is. :(
Expand All @@ -170,7 +170,8 @@ def get_email_verification_nonce(self, c, email):
# Not in the table yet. This should throw an IntegrityError if the
# address is verified for a different participant.

c.run( "INSERT INTO emails (participant_id, address, nonce) VALUES (%s, %s, %s)"
c.run( "INSERT INTO email_addresses (participant_id, address, nonce) "
"VALUES (%s, %s, %s)"
, (self.id, email, nonce)
)
else:
Expand All @@ -181,7 +182,7 @@ def get_email_verification_nonce(self, c, email):
if existing.nonce:
c.run('DELETE FROM claims WHERE nonce=%s', (existing.nonce,))
c.run("""
UPDATE emails
UPDATE email_addresses
SET nonce=%s
, verification_start=now()
WHERE participant_id=%s
Expand Down Expand Up @@ -299,15 +300,15 @@ def save_email_address(self, cursor, address):
"""
cursor.run("""
UPDATE emails
UPDATE email_addresses
SET verified=true, verification_end=now(), nonce=NULL
WHERE participant_id=%s
AND address=%s
AND verified IS NULL
""", (self.id, address))
cursor.run("""
DELETE
FROM emails
FROM email_addresses
WHERE participant_id != %s
AND address=%s
""", (self.id, address))
Expand All @@ -324,7 +325,7 @@ def get_email(self, address, cursor=None, and_lock=False):
:returns: a database record (a named tuple)
"""
sql = 'SELECT * FROM emails WHERE participant_id=%s AND address=%s'
sql = 'SELECT * FROM email_addresses WHERE participant_id=%s AND address=%s'
if and_lock:
sql += ' FOR UPDATE'
return (cursor or self.db).one(sql, (self.id, address))
Expand All @@ -335,7 +336,7 @@ def get_emails(self, cursor=None):
"""
return (cursor or self.db).all("""
SELECT *
FROM emails
FROM email_addresses
WHERE participant_id=%s
ORDER BY id
""", (self.id,))
Expand All @@ -359,7 +360,7 @@ def remove_email(self, address):
, 'participant'
, dict(id=self.id, action='remove', values=dict(email=address))
)
c.run("DELETE FROM emails WHERE participant_id=%s AND address=%s",
c.run("DELETE FROM email_addresses WHERE participant_id=%s AND address=%s",
(self.id, address))


Expand Down
10 changes: 5 additions & 5 deletions gratipay/models/participant/packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ def get_packages_for_claiming(self, manager):
"""
return self.db.all('''
WITH verified_emails AS (
WITH verified_email_addresses AS (
SELECT e.address
, e.address = p.email_address is_primary
FROM emails e
FROM email_addresses e
LEFT JOIN participants p
ON p.id = e.participant_id
WHERE e.participant_id=%s
Expand All @@ -36,12 +36,12 @@ def get_packages_for_claiming(self, manager):
SELECT pkg.*::packages package
, p.*::participants claimed_by
, (SELECT is_primary
FROM verified_emails
FROM verified_email_addresses
WHERE address = ANY(emails)
ORDER BY is_primary DESC, address
LIMIT 1) email_address_is_primary
, (SELECT address
FROM verified_emails
FROM verified_email_addresses
WHERE address = ANY(emails)
ORDER BY is_primary DESC, address
LIMIT 1) email_address
Expand All @@ -53,7 +53,7 @@ def get_packages_for_claiming(self, manager):
LEFT JOIN participants p
ON t.owner = p.username
WHERE package_manager=%s
AND pkg.emails && array(SELECT address FROM verified_emails)
AND pkg.emails && array(SELECT address FROM verified_email_addresses)
ORDER BY email_address_is_primary DESC
, email_address ASC
, pkg.name ASC
Expand Down
6 changes: 3 additions & 3 deletions gratipay/testing/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ def get_last_email(self):


class QueuedEmailHarness(_AbstractEmailHarness):
"""An email harness that pulls from the ``email_queue`` table.
"""An email harness that pulls from the ``email_messages`` table.
"""

def _get_last_email(self):
rec = self.db.one('SELECT * FROM email_queue ORDER BY id DESC LIMIT 1')
rec = self.db.one('SELECT * FROM email_messages ORDER BY ctime DESC LIMIT 1')
return self.app.email_queue._prepare_email_message_for_ses(rec)

def count_email_messages(self):
return self.db.one('SELECT count(*) FROM email_queue')
return self.db.one('SELECT count(*) FROM email_messages WHERE result is null')


class SentEmailHarness(_AbstractEmailHarness):
Expand Down
2 changes: 1 addition & 1 deletion gratipay/testing/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def make_participant(self, username, **kw):
address = kw.pop('email_address')
if address:
self.add_and_verify_email(participant, address)
self.app.email_queue.purge()
self.db.run('DELETE FROM email_messages') # don't confuse email tests

# Update participant
verified_in = kw.pop('verified_in', [])
Expand Down
12 changes: 12 additions & 0 deletions sql/branch.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
BEGIN;
ALTER TABLE emails RENAME TO email_addresses;
ALTER TABLE email_queue RENAME TO email_messages;
ALTER TABLE email_messages ADD COLUMN result text DEFAULT NULL;

-- transfer dead to result
UPDATE email_messages SET result='unknown failure' WHERE dead;
ALTER TABLE email_messages DROP COLUMN dead;

-- assume success for the rest
UPDATE email_messages SET result='' WHERE result is null;
END;
4 changes: 2 additions & 2 deletions tests/py/test_billing_payday.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,8 @@ def test_it_notifies_participants(self):
payday.end()
payday.notify_participants()

emails = self.db.one('SELECT * FROM email_queue')
assert emails.spt_name == 'charge_'+status
outbound = self.db.one("SELECT * FROM email_messages WHERE result is null")
outbound.spt_name == 'charge_'+status

self.app.email_queue.flush()
assert self.get_last_email()['to'] == 'kalel <[email protected]>'
Expand Down
Loading

0 comments on commit 228d07c

Please sign in to comment.