diff --git a/scrapyd/poller.py b/scrapyd/poller.py index aa4097f7..84b1cee0 100644 --- a/scrapyd/poller.py +++ b/scrapyd/poller.py @@ -12,17 +12,35 @@ def __init__(self, config): self.config = config self.update_projects() self.dq = DeferredQueue() + # For backward compatibility with custom SqliteSpiderQueue and JsonSqlitePriorityQueue + # TODO: remove it and add method get_project_with_highest_priority in ISpiderQueue in 1.4 + self.support_comparing_priorities = None @inlineCallbacks def poll(self): if not self.dq.waiting: return - for p, q in iteritems(self.queues): - c = yield maybeDeferred(q.count) - if c: + + if self.support_comparing_priorities is None: + self.test_comparing_priorities() + + project_with_highest_priority = None + if self.support_comparing_priorities: + for p, q in iteritems(self.queues): + project_with_highest_priority = q.get_project_with_highest_priority() + break + if project_with_highest_priority: + q = self.queues[project_with_highest_priority] msg = yield maybeDeferred(q.pop) if msg is not None: # In case of a concurrently accessed queue - returnValue(self.dq.put(self._message(msg, p))) + returnValue(self.dq.put(self._message(msg, project_with_highest_priority))) + if not self.support_comparing_priorities or not project_with_highest_priority: + for p, q in iteritems(self.queues): + c = yield maybeDeferred(q.count) + if c: + msg = yield maybeDeferred(q.pop) + if msg is not None: # In case of a concurrently accessed queue + returnValue(self.dq.put(self._message(msg, p))) def next(self): return self.dq.get() @@ -35,3 +53,14 @@ def _message(self, queue_msg, project): d['_project'] = project d['_spider'] = d.pop('name') return d + + def test_comparing_priorities(self): + for p, q in iteritems(self.queues): + try: + getattr(q, 'get_project_with_highest_priority') + getattr(q.q, 'project_priority_map') + except AttributeError: + self.support_comparing_priorities = False + else: + self.support_comparing_priorities = True + return diff --git a/scrapyd/spiderqueue.py b/scrapyd/spiderqueue.py index 1c889436..418676b4 100644 --- a/scrapyd/spiderqueue.py +++ b/scrapyd/spiderqueue.py @@ -7,7 +7,7 @@ @implementer(ISpiderQueue) class SqliteSpiderQueue(object): - def __init__(self, database=None, table='spider_queue'): + def __init__(self, database=None, table='spider_queue_with_triggers'): self.q = JsonSqlitePriorityQueue(database, table) def add(self, name, priority=0.0, **spider_args): @@ -29,3 +29,10 @@ def remove(self, func): def clear(self): self.q.clear() + + def get_project_with_highest_priority(self): + if self.q.project_priority_map: + return sorted(self.q.project_priority_map, + key=lambda x: self.q.project_priority_map[x], reverse=True)[0] + else: + return None diff --git a/scrapyd/sqlite.py b/scrapyd/sqlite.py index 966505a2..72774c70 100644 --- a/scrapyd/sqlite.py +++ b/scrapyd/sqlite.py @@ -1,5 +1,6 @@ import sqlite3 import json +import os try: from collections.abc import MutableMapping except ImportError: @@ -82,19 +83,28 @@ class JsonSqlitePriorityQueue(object): """SQLite priority queue. It relies on SQLite concurrency support for providing atomic inter-process operations. """ + project_priority_map = {} def __init__(self, database=None, table="queue"): self.database = database or ':memory:' self.table = table + if database: + dbname = os.path.split(database)[-1] + self.project = os.path.splitext(dbname)[0] + else: + self.project = self.database # about check_same_thread: http://twistedmatrix.com/trac/ticket/4040 self.conn = sqlite3.connect(self.database, check_same_thread=False) q = "create table if not exists %s (id integer primary key, " \ - "priority real key, message blob)" % table + "priority real key, message blob, insert_time TIMESTAMP)" % table self.conn.execute(q) + self.create_triggers() + self.update_project_priority_map() def put(self, message, priority=0.0): args = (priority, self.encode(message)) - q = "insert into %s (priority, message) values (?,?)" % self.table + q = "insert into %s (priority, message, insert_time) values (?,?, CURRENT_TIMESTAMP)" \ + % self.table self.conn.execute(q, args) self.conn.commit() @@ -131,6 +141,26 @@ def clear(self): self.conn.execute("delete from %s" % self.table) self.conn.commit() + def create_triggers(self): + self.conn.create_function("update_project_priority_map", 0, self.update_project_priority_map) + for action in ['INSERT', 'UPDATE', 'DELETE']: + name = 'trigger_on_%s' % action.lower() + self.conn.execute(""" + CREATE TRIGGER IF NOT EXISTS %s AFTER %s ON %s + BEGIN + SELECT update_project_priority_map(); + END; + """ % (name, action, self.table)) + + def update_project_priority_map(self): + q = "select priority, strftime('%%s', insert_time) from %s order by priority desc limit 1" \ + % self.table + result = self.conn.execute(q).fetchone() + if result is None: + self.project_priority_map.pop(self.project, None) + else: + self.project_priority_map[self.project] = (result[0], -int(result[-1])) + def __len__(self): q = "select count(*) from %s" % self.table return self.conn.execute(q).fetchone()[0] diff --git a/scrapyd/tests/test_poller.py b/scrapyd/tests/test_poller.py index 6af34100..6387ea84 100644 --- a/scrapyd/tests/test_poller.py +++ b/scrapyd/tests/test_poller.py @@ -1,4 +1,5 @@ import os +import time from twisted.trial import unittest from twisted.internet.defer import Deferred @@ -28,30 +29,40 @@ def test_interface(self): verifyObject(IPoller, self.poller) def test_poll_next(self): - cfg = {'mybot1': 'spider1', - 'mybot2': 'spider2'} - priority = 0 - for prj, spd in cfg.items(): + cfg = [('mybot2', 'spider2', 0), # second + ('mybot1', 'spider2', 0.0), # third + ('mybot1', 'spider1', -1), # fourth + ('mybot1', 'spider3', 1.0)] # first + for prj, spd, priority in cfg: self.queues[prj].add(spd, priority) + if prj == 'mybot2': + time.sleep(1.5) # ensure different timestamp d1 = self.poller.next() d2 = self.poller.next() + d3 = self.poller.next() + d4 = self.poller.next() + d5 = self.poller.next() self.failUnless(isinstance(d1, Deferred)) self.failIf(hasattr(d1, 'result')) - # poll once + # first poll self.poller.poll() self.failUnless(hasattr(d1, 'result') and getattr(d1, 'called', False)) + self.assertEqual(d1.result, {'_project': 'mybot1', '_spider': 'spider3'}) - # which project got run: project1 or project2? - self.failUnless(d1.result.get('_project')) - prj = d1.result['_project'] - self.failUnlessEqual(d1.result['_spider'], cfg.pop(prj)) + # second poll + self.poller.poll() + self.assertEqual(d2.result, {'_project': 'mybot2', '_spider': 'spider2'}) + + # third poll + self.poller.poll() + self.assertEqual(d3.result, {'_project': 'mybot1', '_spider': 'spider2'}) - self.queues[prj].pop() + # fourth poll + self.poller.poll() + self.assertEqual(d4.result, {'_project': 'mybot1', '_spider': 'spider1'}) - # poll twice - # check that the other project's spider got to run + # final poll self.poller.poll() - prj, spd = cfg.popitem() - self.failUnlessEqual(d2.result, {'_project': prj, '_spider': spd}) + self.failIf(hasattr(d5, 'result'))