diff --git a/gratipay/email.py b/gratipay/email.py index b851634521..356ed18955 100644 --- a/gratipay/email.py +++ b/gratipay/email.py @@ -13,7 +13,7 @@ from jinja2 import Environment from markupsafe import escape as htmlescape -from gratipay.exceptions import Throttled +from gratipay.exceptions import NoEmailAddress, Throttled from gratipay.models.participant import Participant from gratipay.utils import find_files, i18n @@ -38,6 +38,7 @@ def __init__(self, env, db, tell_sentry, root): self.tell_sentry = tell_sentry self.sleep_for = env.email_queue_sleep_for self.allow_up_to = env.email_queue_allow_up_to + self.log_every = env.email_queue_log_metrics_every templates = {} templates_dir = os.path.join(root, 'emails') @@ -75,13 +76,18 @@ def put(self, to, template, _user_initiated=True, **context): """ with self.db.get_cursor() as cursor: cursor.run(""" - INSERT INTO email_queue + INSERT INTO email_messages (participant, spt_name, context, user_initiated) VALUES (%s, %s, %s, %s) """, (to.id, template, pickle.dumps(context), _user_initiated)) if _user_initiated: - n = cursor.one('SELECT count(*) FROM email_queue ' - 'WHERE participant=%s AND user_initiated', (to.id,)) + n = cursor.one(""" + SELECT count(*) + FROM email_messages + WHERE participant=%s + AND result is null + AND user_initiated + """, (to.id,)) if n > self.allow_up_to: raise Throttled() @@ -91,9 +97,9 @@ def flush(self): """ fetch_messages = lambda: self.db.all(""" SELECT * - FROM email_queue - WHERE not dead - ORDER BY id ASC + FROM email_messages + WHERE result is null + ORDER BY ctime ASC LIMIT 60 """) nsent = 0 @@ -103,38 +109,30 @@ def flush(self): break for rec in messages: try: - r = self._flush_one(rec) - except: - self.db.run("UPDATE email_queue SET dead=true WHERE id = %s", (rec.id,)) - raise - self.db.run("DELETE FROM email_queue WHERE id = %s", (rec.id,)) - if r == 1: - sleep(self.sleep_for) - nsent += r + message = self._prepare_email_message_for_ses(rec) + result = self._mailer.send_email(**message) + remote_message_id = result['MessageId'] # let KeyErrors go to Sentry + except Exception as exc: + self._store_result(rec.id, repr(exc), None) + raise # we want to see this in Sentry + self._store_result(rec.id, '', remote_message_id) + nsent += 1 + sleep(self.sleep_for) return nsent - def _flush_one(self, rec): - """Send an email message using the underlying ``_mailer``. - - :param Record rec: a database record from the ``email_queue`` table - :return int: the number of emails sent (0 or 1) - - """ - message = self._prepare_email_message_for_ses(rec) - if message is None: - return 0 # Not sent - self._mailer.send_email(**message) - return 1 # Sent + def _store_result(self, message_id, result, remote_message_id): + self.db.run("UPDATE email_messages SET result=%s, remote_message_id=%s " + "WHERE id=%s", (result, remote_message_id, message_id)) def _prepare_email_message_for_ses(self, rec): """Prepare an email message for delivery via Amazon SES. - :param Record rec: a database record from the ``email_queue`` table + :param Record rec: a database record from the ``email_messages`` table - :returns: ``None`` if we can't find an email address to send to :returns: ``dict`` if we can find an email address to send to + :raises: ``NoEmailAddress`` if we can't find an email address to send to We look for an email address to send to in two places: @@ -156,7 +154,7 @@ def _prepare_email_message_for_ses(self, rec): context.setdefault('include_unsubscribe', True) email = context.setdefault('email', to.email_address) if not email: - return None + raise NoEmailAddress() langs = i18n.parse_accept_lang(to.email_lang or 'en') locale = i18n.match_lang(langs) i18n.add_helpers_to_context(self.tell_sentry, context, locale) @@ -188,15 +186,16 @@ def render(t, context): def log_metrics(self, _print=print): - ndead = self.db.one('SELECT COUNT(*) FROM email_queue WHERE dead') - ntotal = self.db.one('SELECT COUNT(*) FROM email_queue') - _print('count#email_queue_dead=%d count#email_queue_total=%d' % (ndead, ntotal)) - - - def purge(self): - """Remove all messages from the queue. - """ - self.db.run('DELETE FROM email_queue') + stats = self.db.one(""" + SELECT count(CASE WHEN result = '' THEN 1 END) AS sent + , count(CASE WHEN result > '' THEN 1 END) AS failed + , count(CASE WHEN result is null THEN 1 END) AS pending + FROM email_messages + WHERE ctime > now() - %s::interval + """, ('{} seconds'.format(self.log_every),), back_as=dict) + prefix = 'count#email_queue' + variables = ('sent', 'failed', 'pending') + _print(' '.join('{}_{}={}'.format(prefix, v, stats[v]) for v in variables)) jinja_env = Environment() @@ -242,3 +241,5 @@ def send_email(self, **email): p(' ', line) p() p('-'*78) + + return {'MessageId': 'deadbeef'} # simulate a remote message id diff --git a/gratipay/exceptions.py b/gratipay/exceptions.py index cece213bdc..9ab044de40 100644 --- a/gratipay/exceptions.py +++ b/gratipay/exceptions.py @@ -55,6 +55,9 @@ def lazy_body(self, _): return _("You've reached the maximum number of email addresses we allow.") +class NoEmailAddress(Exception): + pass + class Throttled(LocalizedErrorResponse): def lazy_body(self, _): return _("You've initiated too many emails too quickly. Please try again in a minute or two.") diff --git a/gratipay/models/package/emails.py b/gratipay/models/package/emails.py index 32c38f3f51..0f8d467001 100644 --- a/gratipay/models/package/emails.py +++ b/gratipay/models/package/emails.py @@ -48,7 +48,7 @@ def classify_emails_for_participant(self, participant): other_verified = self.db.all(''' SELECT address - FROM emails + FROM email_addresses WHERE verified is true AND participant_id != %s AND address = ANY((SELECT emails FROM packages WHERE id=%s)::text[]) diff --git a/gratipay/models/participant/__init__.py b/gratipay/models/participant/__init__.py index 90f8256c62..aaa3b97714 100644 --- a/gratipay/models/participant/__init__.py +++ b/gratipay/models/participant/__init__.py @@ -362,7 +362,7 @@ def clear_personal_information(self, cursor): AND is_member IS true ); - DELETE FROM emails WHERE participant_id = %(participant_id)s; + DELETE FROM email_addresses WHERE participant_id = %(participant_id)s; DELETE FROM statements WHERE participant=%(participant_id)s; DELETE FROM participant_identities WHERE participant_id=%(participant_id)s; @@ -1104,17 +1104,17 @@ def take_over(self, account, have_confirmation=False): MERGE_EMAIL_ADDRESSES = """ - WITH emails_to_keep AS ( + WITH email_addresses_to_keep AS ( SELECT DISTINCT ON (address) id - FROM emails + FROM email_addresses WHERE participant_id IN (%(dead)s, %(live)s) ORDER BY address, verification_end, verification_start DESC ) - DELETE FROM emails + DELETE FROM email_addresses WHERE participant_id IN (%(dead)s, %(live)s) - AND id NOT IN (SELECT id FROM emails_to_keep); + AND id NOT IN (SELECT id FROM email_addresses_to_keep); - UPDATE emails + UPDATE email_addresses SET participant_id = %(live)s WHERE participant_id = %(dead)s; diff --git a/gratipay/models/participant/email.py b/gratipay/models/participant/email.py index 97864d4153..c5da38de8a 100644 --- a/gratipay/models/participant/email.py +++ b/gratipay/models/participant/email.py @@ -41,13 +41,13 @@ def __repr__(self): class Email(object): """Participants may associate email addresses with their account. - Email addresses are stored in an ``emails`` table in the database, which - holds the addresses themselves as well as info related to address + Email addresses are stored in an ``email_addresses`` table in the database, + which holds the addresses themselves as well as info related to address verification. While a participant may have multiple email addresses on file, verified or not, only one will be the *primary* email address: the one also recorded in ``participants.email_address``. It's a bug for the primary address not to be verified, or for an address to be in - ``participants.email_address`` but not also in ``emails``. + ``participants.email_address`` but not also in ``email_addresses``. Having a verified email is a prerequisite for certain other features on Gratipay, such as linking a PayPal account, or filing a national identity. @@ -108,7 +108,7 @@ def validate_email_verification_request(self, c, email, *packages): owner_id = c.one(""" SELECT participant_id - FROM emails + FROM email_addresses WHERE address = %(email)s AND verified IS true """, dict(email=email)) @@ -160,7 +160,7 @@ def get_email_verification_nonce(self, c, email): """Given a cursor and email address, return a verification nonce. """ nonce = str(uuid.uuid4()) - existing = c.one( 'SELECT * FROM emails WHERE address=%s AND participant_id=%s' + existing = c.one( 'SELECT * FROM email_addresses WHERE address=%s AND participant_id=%s' , (email, self.id) ) # can't use eafp here because of cursor error handling # XXX I forget what eafp is. :( @@ -170,7 +170,8 @@ def get_email_verification_nonce(self, c, email): # Not in the table yet. This should throw an IntegrityError if the # address is verified for a different participant. - c.run( "INSERT INTO emails (participant_id, address, nonce) VALUES (%s, %s, %s)" + c.run( "INSERT INTO email_addresses (participant_id, address, nonce) " + "VALUES (%s, %s, %s)" , (self.id, email, nonce) ) else: @@ -181,7 +182,7 @@ def get_email_verification_nonce(self, c, email): if existing.nonce: c.run('DELETE FROM claims WHERE nonce=%s', (existing.nonce,)) c.run(""" - UPDATE emails + UPDATE email_addresses SET nonce=%s , verification_start=now() WHERE participant_id=%s @@ -299,7 +300,7 @@ def save_email_address(self, cursor, address): """ cursor.run(""" - UPDATE emails + UPDATE email_addresses SET verified=true, verification_end=now(), nonce=NULL WHERE participant_id=%s AND address=%s @@ -307,7 +308,7 @@ def save_email_address(self, cursor, address): """, (self.id, address)) cursor.run(""" DELETE - FROM emails + FROM email_addresses WHERE participant_id != %s AND address=%s """, (self.id, address)) @@ -324,7 +325,7 @@ def get_email(self, address, cursor=None, and_lock=False): :returns: a database record (a named tuple) """ - sql = 'SELECT * FROM emails WHERE participant_id=%s AND address=%s' + sql = 'SELECT * FROM email_addresses WHERE participant_id=%s AND address=%s' if and_lock: sql += ' FOR UPDATE' return (cursor or self.db).one(sql, (self.id, address)) @@ -335,7 +336,7 @@ def get_emails(self, cursor=None): """ return (cursor or self.db).all(""" SELECT * - FROM emails + FROM email_addresses WHERE participant_id=%s ORDER BY id """, (self.id,)) @@ -359,7 +360,7 @@ def remove_email(self, address): , 'participant' , dict(id=self.id, action='remove', values=dict(email=address)) ) - c.run("DELETE FROM emails WHERE participant_id=%s AND address=%s", + c.run("DELETE FROM email_addresses WHERE participant_id=%s AND address=%s", (self.id, address)) diff --git a/gratipay/models/participant/packages.py b/gratipay/models/participant/packages.py index b1253a3e93..2e8549c7e9 100644 --- a/gratipay/models/participant/packages.py +++ b/gratipay/models/participant/packages.py @@ -24,10 +24,10 @@ def get_packages_for_claiming(self, manager): """ return self.db.all(''' - WITH verified_emails AS ( + WITH verified_email_addresses AS ( SELECT e.address , e.address = p.email_address is_primary - FROM emails e + FROM email_addresses e LEFT JOIN participants p ON p.id = e.participant_id WHERE e.participant_id=%s @@ -36,12 +36,12 @@ def get_packages_for_claiming(self, manager): SELECT pkg.*::packages package , p.*::participants claimed_by , (SELECT is_primary - FROM verified_emails + FROM verified_email_addresses WHERE address = ANY(emails) ORDER BY is_primary DESC, address LIMIT 1) email_address_is_primary , (SELECT address - FROM verified_emails + FROM verified_email_addresses WHERE address = ANY(emails) ORDER BY is_primary DESC, address LIMIT 1) email_address @@ -53,7 +53,7 @@ def get_packages_for_claiming(self, manager): LEFT JOIN participants p ON t.owner = p.username WHERE package_manager=%s - AND pkg.emails && array(SELECT address FROM verified_emails) + AND pkg.emails && array(SELECT address FROM verified_email_addresses) ORDER BY email_address_is_primary DESC , email_address ASC , pkg.name ASC diff --git a/gratipay/testing/email.py b/gratipay/testing/email.py index 79118cb351..e2919df5d2 100644 --- a/gratipay/testing/email.py +++ b/gratipay/testing/email.py @@ -1,8 +1,6 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, division, print_function, unicode_literals -import mock - from gratipay.testing import Harness @@ -37,33 +35,39 @@ def get_last_email(self): class QueuedEmailHarness(_AbstractEmailHarness): - """An email harness that pulls from the ``email_queue`` table. + """An email harness that pulls from the ``email_messages`` table. """ def _get_last_email(self): - rec = self.db.one('SELECT * FROM email_queue ORDER BY id DESC LIMIT 1') + rec = self.db.one('SELECT * FROM email_messages ORDER BY ctime DESC LIMIT 1') return self.app.email_queue._prepare_email_message_for_ses(rec) def count_email_messages(self): - return self.db.one('SELECT count(*) FROM email_queue') + return self.db.one('SELECT count(*) FROM email_messages WHERE result is null') class SentEmailHarness(_AbstractEmailHarness): - """An email harness that mocks ``_mailer.send_email`` to ``get_last_email`` - post-queue. + """An email harness that patches ``_mailer.send_email`` to ``get_last_email`` + after running through the email queue machinery. """ def setUp(self): _AbstractEmailHarness.setUp(self) - self.mailer_patcher = mock.patch.object(self.app.email_queue._mailer, 'send_email') - self.mailer = self.mailer_patcher.start() - self.addCleanup(self.mailer_patcher.stop) - sleep_patcher = mock.patch('gratipay.application.email.sleep') - sleep_patcher.start() - self.addCleanup(sleep_patcher.stop) + self.__messages = [] + + def send_email(**message): + self.__messages.append(message) + return {'MessageId': 'deadbeef'} + + self.__send_email = self.app.email_queue._mailer.send_email + self.app.email_queue._mailer.send_email = send_email + + def tearDown(self): + self.app.email_queue._mailer.send_email = self.__send_email + _AbstractEmailHarness.tearDown(self) def _get_last_email(self): - return self.mailer.call_args[1] + return self.__messages[-1] def count_email_messages(self): - return self.mailer.call_count + return len(self.__messages) diff --git a/gratipay/testing/harness.py b/gratipay/testing/harness.py index d2aceed24b..ce5e7b0872 100644 --- a/gratipay/testing/harness.py +++ b/gratipay/testing/harness.py @@ -271,7 +271,7 @@ def make_participant(self, username, **kw): address = kw.pop('email_address') if address: self.add_and_verify_email(participant, address) - self.app.email_queue.purge() + self.db.run('DELETE FROM email_messages') # don't confuse email tests # Update participant verified_in = kw.pop('verified_in', []) diff --git a/sql/branch.sql b/sql/branch.sql new file mode 100644 index 0000000000..32354e28ce --- /dev/null +++ b/sql/branch.sql @@ -0,0 +1,13 @@ +BEGIN; + ALTER TABLE emails RENAME TO email_addresses; + ALTER TABLE email_queue RENAME TO email_messages; + ALTER TABLE email_messages ADD COLUMN result text; + ALTER TABLE email_messages ADD COLUMN remote_message_id text; + + -- transfer dead to result + UPDATE email_messages SET result='unknown failure' WHERE dead; + ALTER TABLE email_messages DROP COLUMN dead; + + -- assume success for the rest + UPDATE email_messages SET result='' WHERE result is null; +END; diff --git a/tests/py/test_billing_payday.py b/tests/py/test_billing_payday.py index 72219fbc6c..89122339f6 100644 --- a/tests/py/test_billing_payday.py +++ b/tests/py/test_billing_payday.py @@ -630,8 +630,8 @@ def test_it_notifies_participants(self): payday.end() payday.notify_participants() - emails = self.db.one('SELECT * FROM email_queue') - assert emails.spt_name == 'charge_'+status + outbound = self.db.one("SELECT * FROM email_messages WHERE result is null") + outbound.spt_name == 'charge_'+status assert self.get_last_email()['to'] == 'kalel ' assert 'Gratiteam' in self.get_last_email()['body_text'] diff --git a/tests/py/test_email.py b/tests/py/test_email.py index 1f9f44713e..c93b538e36 100644 --- a/tests/py/test_email.py +++ b/tests/py/test_email.py @@ -2,10 +2,9 @@ import time -import mock from pytest import raises -from gratipay.exceptions import Throttled +from gratipay.exceptions import NoEmailAddress, Throttled from gratipay.testing import Harness from gratipay.testing.email import SentEmailHarness @@ -25,7 +24,7 @@ def test_queueing_email_is_throttled(self): def test_queueing_email_writes_timestamp(self): self.app.email_queue.put(self.alice, "base") - ctime = self.db.one("SELECT EXTRACT(epoch FROM ctime) FROM email_queue") + ctime = self.db.one("SELECT EXTRACT(epoch FROM ctime) FROM email_messages") assert abs(ctime - time.time()) < 300 def test_only_user_initiated_messages_count_towards_throttling(self): @@ -54,26 +53,24 @@ def put_message(self, email_address='larry@example.com'): def test_can_flush_an_email_from_the_queue(self): self.put_message() - assert self.db.one("SELECT spt_name FROM email_queue") == "base" + assert self.db.one("SELECT * FROM email_messages").spt_name == "base" self.app.email_queue.flush() assert self.count_email_messages() == 1 last_email = self.get_last_email() assert last_email['to'] == 'larry ' expected = "Something not right?" assert expected in last_email['body_text'] - assert self.db.one("SELECT spt_name FROM email_queue") is None + assert self.db.one("SELECT * FROM email_messages").result == '' - def test_flushing_an_email_without_address_just_skips_it(self): + def test_flushing_an_email_without_address_logs_a_failure(self): self.put_message(email_address=None) - - assert self.db.one("SELECT spt_name FROM email_queue") == "base" - self.app.email_queue.flush() + raises(NoEmailAddress, self.app.email_queue.flush) assert self.count_email_messages() == 0 - assert self.db.one("SELECT spt_name FROM email_queue") is None + assert self.db.one("SELECT * FROM email_messages").result == "NoEmailAddress()" def test_flush_does_not_resend_dead_letters(self): self.put_message() - self.db.run("UPDATE email_queue SET dead=true") + self.db.run("UPDATE email_messages SET result='foo error'") self.app.email_queue.flush() assert self.count_email_messages() == 0 @@ -85,25 +82,39 @@ def failer(*a, **kw): raise SomeProblem() # queue a message self.put_message() - assert not self.db.one("SELECT dead FROM email_queue") + assert self.db.one("SELECT result FROM email_messages") is None # now try to send it raises(SomeProblem, self.app.email_queue.flush) assert self.count_email_messages() == 0 # nothing sent - assert self.db.one("SELECT dead FROM email_queue") + assert self.db.one("SELECT result FROM email_messages") == 'SomeProblem()' class TestLogMetrics(Harness): + def setUp(self): + Harness.setUp(self) + self._log_every = self.app.email_queue.log_every + self.app.email_queue.log_every = 1 + + def tearDown(self): + self.app.email_queue.log_every = self._log_every + Harness.tearDown(self) + def test_log_metrics(self): - alice = self.make_participant('alice', claimed_time='now', email_address='alice@example.com') + alice = self.make_participant( 'alice' + , claimed_time='now' + , email_address='alice@example.com' + ) self.app.email_queue.put(alice, "base") self.app.email_queue.put(alice, "base") self.app.email_queue.put(alice, "base") - self.db.run("UPDATE email_queue SET dead = 'true' WHERE id IN (SELECT id FROM email_queue LIMIT 1)") - - mock_print = mock.Mock() + self.db.run("UPDATE email_messages SET result='foo error' " + "WHERE id IN (SELECT id FROM email_messages LIMIT 1)") - self.app.email_queue.log_metrics(_print=mock_print) - mock_print.assert_called_once_with('count#email_queue_dead=1 count#email_queue_total=3') + captured = {} + def p(message): captured['message'] = message + self.app.email_queue.log_metrics(_print=p) + assert captured['message'] == \ + 'count#email_queue_sent=0 count#email_queue_failed=1 count#email_queue_pending=2' diff --git a/tests/py/test_participant_emails.py b/tests/py/test_participant_emails.py index 9b5ebfb49c..2f7abba0a6 100644 --- a/tests/py/test_participant_emails.py +++ b/tests/py/test_participant_emails.py @@ -163,7 +163,7 @@ def test_verify_email_expired_nonce_fails(self): address = 'alice@example.com' self.hit_email_spt('add-email', address) self.db.run(""" - UPDATE emails + UPDATE email_addresses SET verification_start = (now() - INTERVAL '25 hours') WHERE participant_id = %s; """, (self.alice.id,)) @@ -531,7 +531,7 @@ def test_finishing_verification_clears_competing_claims_and_emails(self): bob.start_email_verification('alice@example.com', foo) bnonce = bob.get_emails()[0].nonce - _emails = lambda: self.db.all('select participant_id as i from emails order by i') + _emails = lambda: self.db.all('select participant_id as i from email_addresses order by i') _claims = lambda: dict(self.db.all('select nonce, package_id from claims')) assert _claims() == {anonce: foo.id, bnonce: foo.id} diff --git a/tests/ttw/test_package_claiming.py b/tests/ttw/test_package_claiming.py index b852996589..80ef6dd367 100644 --- a/tests/ttw/test_package_claiming.py +++ b/tests/ttw/test_package_claiming.py @@ -18,7 +18,8 @@ def check(self, choice=0): self.css('#content button')[0].click() address = ('alice' if choice == 0 else 'bob') + '@example.com' assert self.wait_for_success() == 'Check {} for a verification link.'.format(address) - return self.db.one('select address from claims c join emails e on c.nonce = e.nonce') + return self.db.one('select address from claims c ' + 'join email_addresses e on c.nonce = e.nonce') def finish_claiming(self): alice = P('alice') @@ -78,7 +79,7 @@ def test_visiting_verify_link_shows_helpful_information(self): self.make_package() self.check() - link = pickle.loads(self.db.one('select context from email_queue'))['link'] + link = pickle.loads(self.db.one('select context from email_messages'))['link'] link = link[len(self.base_url):] # strip because visit will add it back self.visit(link) @@ -221,7 +222,7 @@ def test_sends_one_mail_for_multiple_packages(self): assert len(self.css('table.listing td.item')) == 3 assert self.wait_for_success() == 'Check alice@example.com for a verification link.' assert self.db.one('select count(*) from claims') == 3 - assert self.db.one('select count(*) from email_queue') == 1 + assert self.db.one('select count(*) from email_messages') == 1 def test_doesnt_send_for_unclaimable_packages(self): self.make_participant('alice', claimed_time='now', email_address='alice@example.com') @@ -232,4 +233,4 @@ def test_doesnt_send_for_unclaimable_packages(self): assert len(self.css('table.listing td.item')) == 3 assert self.wait_for_success() == 'Check alice@example.com for a verification link.' assert self.db.one('select count(*) from claims') == 2 - assert self.db.one('select count(*) from email_queue') == 1 + assert self.db.one('select count(*) from email_messages') == 1