From bcfb8f7b338f2faac87283721db3aee4b845c426 Mon Sep 17 00:00:00 2001 From: my8100 Date: Thu, 18 Jul 2019 18:11:12 +0800 Subject: [PATCH] Fix polling order in contrib modules --- docs/config.rst | 8 +++ scrapyd/app.py | 7 ++- scrapyd/contrib/__init__.py | 0 scrapyd/contrib/fix_poll_order/__init__.py | 0 scrapyd/contrib/fix_poll_order/poller.py | 21 +++++++ scrapyd/contrib/fix_poll_order/spiderqueue.py | 15 +++++ scrapyd/contrib/fix_poll_order/sqlite.py | 47 +++++++++++++++ scrapyd/default_scrapyd.conf | 1 + scrapyd/tests/test_poller.py | 60 +++++++++++++++++++ scrapyd/utils.py | 6 +- 10 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 scrapyd/contrib/__init__.py create mode 100644 scrapyd/contrib/fix_poll_order/__init__.py create mode 100644 scrapyd/contrib/fix_poll_order/poller.py create mode 100644 scrapyd/contrib/fix_poll_order/spiderqueue.py create mode 100644 scrapyd/contrib/fix_poll_order/sqlite.py diff --git a/docs/config.rst b/docs/config.rst index a7ee8c61..5911fdb2 100644 --- a/docs/config.rst +++ b/docs/config.rst @@ -64,6 +64,13 @@ Whether debug mode is enabled. Defaults to ``off``. When debug mode is enabled the full Python traceback will be returned (as plain text responses) when there is an error processing a JSON API call. +fix_poll_order +----- + +.. versionadded:: 1.3 + +Whether to activate contrib modules for fix_poll_order. Defaults to ``off``. See `issue 187`_. + eggs_dir -------- @@ -168,3 +175,4 @@ Here is an example configuration file with all the defaults: .. literalinclude:: ../scrapyd/default_scrapyd.conf .. _Twisted Application Framework: http://twistedmatrix.com/documents/current/core/howto/application.html +.. _issue 187: https://github.com/scrapy/scrapyd/issues/187 diff --git a/scrapyd/app.py b/scrapyd/app.py index 075c5d10..7022ff3f 100644 --- a/scrapyd/app.py +++ b/scrapyd/app.py @@ -13,6 +13,7 @@ from .eggstorage import FilesystemEggStorage from .scheduler import SpiderScheduler from .poller import QueuePoller +from .contrib.fix_poll_order.poller import FixQueuePoller from .environ import Environment from .config import Config from .basicauth import PublicHTMLRealm, StringCredentialsChecker @@ -23,7 +24,11 @@ def application(config): bind_address = config.get('bind_address', '127.0.0.1') poll_interval = config.getfloat('poll_interval', 5) - poller = QueuePoller(config) + if config.getboolean('fix_poll_order', False): + log.msg("Activating contrib modules for fix_poll_order") + poller = FixQueuePoller(config) + else: + poller = QueuePoller(config) eggstorage = FilesystemEggStorage(config) scheduler = SpiderScheduler(config) environment = Environment(config) diff --git a/scrapyd/contrib/__init__.py b/scrapyd/contrib/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scrapyd/contrib/fix_poll_order/__init__.py b/scrapyd/contrib/fix_poll_order/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scrapyd/contrib/fix_poll_order/poller.py b/scrapyd/contrib/fix_poll_order/poller.py new file mode 100644 index 00000000..b6e5b1c6 --- /dev/null +++ b/scrapyd/contrib/fix_poll_order/poller.py @@ -0,0 +1,21 @@ +from six import iteritems +from twisted.internet.defer import inlineCallbacks, maybeDeferred, returnValue + +from scrapyd.poller import QueuePoller + + +class FixQueuePoller(QueuePoller): + + @inlineCallbacks + def poll(self): + if not self.dq.waiting: + return + project_with_highest_priority = None + for p, q in iteritems(self.queues): + project_with_highest_priority = q.get_project_with_highest_priority() + break + if project_with_highest_priority: + q = self.queues[project_with_highest_priority] + msg = yield maybeDeferred(q.pop) + if msg is not None: # In case of a concurrently accessed queue + returnValue(self.dq.put(self._message(msg, project_with_highest_priority))) diff --git a/scrapyd/contrib/fix_poll_order/spiderqueue.py b/scrapyd/contrib/fix_poll_order/spiderqueue.py new file mode 100644 index 00000000..43098624 --- /dev/null +++ b/scrapyd/contrib/fix_poll_order/spiderqueue.py @@ -0,0 +1,15 @@ +from scrapyd.spiderqueue import SqliteSpiderQueue +from scrapyd.contrib.fix_poll_order.sqlite import FixJsonSqlitePriorityQueue + + +class FixSqliteSpiderQueue(SqliteSpiderQueue): + + def __init__(self, database=None, table='spider_queue_with_triggers'): + self.q = FixJsonSqlitePriorityQueue(database, table) + + def get_project_with_highest_priority(self): + if self.q.project_priority_map: + return sorted(self.q.project_priority_map, + key=lambda x: self.q.project_priority_map[x], reverse=True)[0] + else: + return None diff --git a/scrapyd/contrib/fix_poll_order/sqlite.py b/scrapyd/contrib/fix_poll_order/sqlite.py new file mode 100644 index 00000000..5da14018 --- /dev/null +++ b/scrapyd/contrib/fix_poll_order/sqlite.py @@ -0,0 +1,47 @@ +import os +import sqlite3 + +from scrapyd.sqlite import JsonSqlitePriorityQueue + + +class FixJsonSqlitePriorityQueue(JsonSqlitePriorityQueue): + """SQLite priority queue. It relies on SQLite concurrency support for + providing atomic inter-process operations. + """ + project_priority_map = {} + + def __init__(self, database=None, table="queue"): + self.database = database or ':memory:' + self.table = table + if database: + dbname = os.path.split(database)[-1] + self.project = os.path.splitext(dbname)[0] + else: + self.project = self.database + # about check_same_thread: http://twistedmatrix.com/trac/ticket/4040 + self.conn = sqlite3.connect(self.database, check_same_thread=False) + q = "create table if not exists %s (id integer primary key, " \ + "priority real key, message blob, insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP)" % table + self.conn.execute(q) + self.create_triggers() + self.update_project_priority_map() + + def create_triggers(self): + self.conn.create_function("update_project_priority_map", 0, self.update_project_priority_map) + for action in ['INSERT', 'UPDATE', 'DELETE']: + name = 'trigger_on_%s' % action.lower() + self.conn.execute(""" + CREATE TRIGGER IF NOT EXISTS %s AFTER %s ON %s + BEGIN + SELECT update_project_priority_map(); + END; + """ % (name, action, self.table)) + + def update_project_priority_map(self): + q = "select priority, strftime('%%s', insert_time) from %s order by priority desc limit 1" \ + % self.table + result = self.conn.execute(q).fetchone() + if result is None: + self.project_priority_map.pop(self.project, None) + else: + self.project_priority_map[self.project] = (result[0], -int(result[-1])) diff --git a/scrapyd/default_scrapyd.conf b/scrapyd/default_scrapyd.conf index 05899d09..4f79c278 100644 --- a/scrapyd/default_scrapyd.conf +++ b/scrapyd/default_scrapyd.conf @@ -13,6 +13,7 @@ http_port = 6800 username = password = debug = off +fix_poll_order = off runner = scrapyd.runner application = scrapyd.app.application launcher = scrapyd.launcher.Launcher diff --git a/scrapyd/tests/test_poller.py b/scrapyd/tests/test_poller.py index 6af34100..7757be91 100644 --- a/scrapyd/tests/test_poller.py +++ b/scrapyd/tests/test_poller.py @@ -1,4 +1,5 @@ import os +import time from twisted.trial import unittest from twisted.internet.defer import Deferred @@ -8,6 +9,7 @@ from scrapyd.interfaces import IPoller from scrapyd.config import Config from scrapyd.poller import QueuePoller +from scrapyd.contrib.fix_poll_order.poller import FixQueuePoller from scrapyd.utils import get_spider_queues class QueuePollerTest(unittest.TestCase): @@ -55,3 +57,61 @@ def test_poll_next(self): self.poller.poll() prj, spd = cfg.popitem() self.failUnlessEqual(d2.result, {'_project': prj, '_spider': spd}) + + +class FixQueuePollerTest(unittest.TestCase): + + def setUp(self): + d = self.mktemp() + eggs_dir = os.path.join(d, 'eggs') + dbs_dir = os.path.join(d, 'dbs') + os.makedirs(eggs_dir) + os.makedirs(dbs_dir) + os.makedirs(os.path.join(eggs_dir, 'mybot1')) + os.makedirs(os.path.join(eggs_dir, 'mybot2')) + config = Config(values={'eggs_dir': eggs_dir, 'dbs_dir': dbs_dir, + 'fix_poll_order': 'on'}) + self.queues = get_spider_queues(config) + self.poller = FixQueuePoller(config) + + def test_interface(self): + verifyObject(IPoller, self.poller) + + def test_poll_next(self): + cfg = [('mybot2', 'spider2', 0), # second + ('mybot1', 'spider2', 0.0), # third + ('mybot1', 'spider1', -1), # fourth + ('mybot1', 'spider3', 1.0)] # first + for prj, spd, priority in cfg: + self.queues[prj].add(spd, priority) + if prj == 'mybot2': + time.sleep(2) # ensure different timestamp + + d1 = self.poller.next() + d2 = self.poller.next() + d3 = self.poller.next() + d4 = self.poller.next() + d5 = self.poller.next() + self.failUnless(isinstance(d1, Deferred)) + self.failIf(hasattr(d1, 'result')) + + # first poll + self.poller.poll() + self.failUnless(hasattr(d1, 'result') and getattr(d1, 'called', False)) + self.assertEqual(d1.result, {'_project': 'mybot1', '_spider': 'spider3'}) + + # second poll + self.poller.poll() + self.assertEqual(d2.result, {'_project': 'mybot2', '_spider': 'spider2'}) + + # third poll + self.poller.poll() + self.assertEqual(d3.result, {'_project': 'mybot1', '_spider': 'spider2'}) + + # fourth poll + self.poller.poll() + self.assertEqual(d4.result, {'_project': 'mybot1', '_spider': 'spider1'}) + + # final poll + self.poller.poll() + self.failIf(hasattr(d5, 'result')) diff --git a/scrapyd/utils.py b/scrapyd/utils.py index e23c2c94..1a1d28ab 100644 --- a/scrapyd/utils.py +++ b/scrapyd/utils.py @@ -9,6 +9,7 @@ from twisted.web import resource from scrapyd.spiderqueue import SqliteSpiderQueue +from scrapyd.contrib.fix_poll_order.spiderqueue import FixSqliteSpiderQueue from scrapyd.config import Config @@ -59,7 +60,10 @@ def get_spider_queues(config): d = {} for project in get_project_list(config): dbpath = os.path.join(dbsdir, '%s.db' % project) - d[project] = SqliteSpiderQueue(dbpath) + if config.getboolean('fix_poll_order', False): + d[project] = FixSqliteSpiderQueue(dbpath) + else: + d[project] = SqliteSpiderQueue(dbpath) return d def get_project_list(config):