Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix polling order in contrib modules #349

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ Whether debug mode is enabled. Defaults to ``off``. When debug mode is enabled
the full Python traceback will be returned (as plain text responses) when there
is an error processing a JSON API call.

fix_poll_order
-----

.. versionadded:: 1.3

Whether to activate contrib modules for fix_poll_order. Defaults to ``off``. See `issue 187`_.

eggs_dir
--------

Expand Down Expand Up @@ -168,3 +175,4 @@ Here is an example configuration file with all the defaults:

.. literalinclude:: ../scrapyd/default_scrapyd.conf
.. _Twisted Application Framework: http://twistedmatrix.com/documents/current/core/howto/application.html
.. _issue 187: https://github.com/scrapy/scrapyd/issues/187
7 changes: 6 additions & 1 deletion scrapyd/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .eggstorage import FilesystemEggStorage
from .scheduler import SpiderScheduler
from .poller import QueuePoller
from .contrib.fix_poll_order.poller import FixQueuePoller
from .environ import Environment
from .config import Config
from .basicauth import PublicHTMLRealm, StringCredentialsChecker
Expand All @@ -23,7 +24,11 @@ def application(config):
bind_address = config.get('bind_address', '127.0.0.1')
poll_interval = config.getfloat('poll_interval', 5)

poller = QueuePoller(config)
if config.getboolean('fix_poll_order', False):
log.msg("Activating contrib modules for fix_poll_order")
poller = FixQueuePoller(config)
else:
poller = QueuePoller(config)
Comment on lines +27 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should do something similar to Launcher, etc. so that users can select any module (including their own), not just between these two modules.

eggstorage = FilesystemEggStorage(config)
scheduler = SpiderScheduler(config)
environment = Environment(config)
Expand Down
Empty file added scrapyd/contrib/__init__.py
Empty file.
Empty file.
21 changes: 21 additions & 0 deletions scrapyd/contrib/fix_poll_order/poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from six import iteritems
from twisted.internet.defer import inlineCallbacks, maybeDeferred, returnValue

from scrapyd.poller import QueuePoller


class FixQueuePoller(QueuePoller):

@inlineCallbacks
def poll(self):
if not self.dq.waiting:
return
project_with_highest_priority = None
for p, q in iteritems(self.queues):
project_with_highest_priority = q.get_project_with_highest_priority()
break
if project_with_highest_priority:
q = self.queues[project_with_highest_priority]
msg = yield maybeDeferred(q.pop)
if msg is not None: # In case of a concurrently accessed queue
returnValue(self.dq.put(self._message(msg, project_with_highest_priority)))
15 changes: 15 additions & 0 deletions scrapyd/contrib/fix_poll_order/spiderqueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from scrapyd.spiderqueue import SqliteSpiderQueue
from scrapyd.contrib.fix_poll_order.sqlite import FixJsonSqlitePriorityQueue


class FixSqliteSpiderQueue(SqliteSpiderQueue):

def __init__(self, database=None, table='spider_queue_with_triggers'):
self.q = FixJsonSqlitePriorityQueue(database, table)

def get_project_with_highest_priority(self):
if self.q.project_priority_map:
return sorted(self.q.project_priority_map,
key=lambda x: self.q.project_priority_map[x], reverse=True)[0]
else:
return None
47 changes: 47 additions & 0 deletions scrapyd/contrib/fix_poll_order/sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
import sqlite3

from scrapyd.sqlite import JsonSqlitePriorityQueue


class FixJsonSqlitePriorityQueue(JsonSqlitePriorityQueue):
"""SQLite priority queue. It relies on SQLite concurrency support for
providing atomic inter-process operations.
"""
project_priority_map = {}

def __init__(self, database=None, table="queue"):
self.database = database or ':memory:'
self.table = table
if database:
dbname = os.path.split(database)[-1]
self.project = os.path.splitext(dbname)[0]
else:
self.project = self.database
# about check_same_thread: http://twistedmatrix.com/trac/ticket/4040
self.conn = sqlite3.connect(self.database, check_same_thread=False)
q = "create table if not exists %s (id integer primary key, " \
"priority real key, message blob, insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP)" % table
self.conn.execute(q)
self.create_triggers()
self.update_project_priority_map()

def create_triggers(self):
self.conn.create_function("update_project_priority_map", 0, self.update_project_priority_map)
for action in ['INSERT', 'UPDATE', 'DELETE']:
name = 'trigger_on_%s' % action.lower()
self.conn.execute("""
CREATE TRIGGER IF NOT EXISTS %s AFTER %s ON %s
BEGIN
SELECT update_project_priority_map();
END;
""" % (name, action, self.table))

def update_project_priority_map(self):
q = "select priority, strftime('%%s', insert_time) from %s order by priority desc limit 1" \
% self.table
result = self.conn.execute(q).fetchone()
if result is None:
self.project_priority_map.pop(self.project, None)
else:
self.project_priority_map[self.project] = (result[0], -int(result[-1]))
1 change: 1 addition & 0 deletions scrapyd/default_scrapyd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ http_port = 6800
username =
password =
debug = off
fix_poll_order = off
runner = scrapyd.runner
application = scrapyd.app.application
launcher = scrapyd.launcher.Launcher
Expand Down
60 changes: 60 additions & 0 deletions scrapyd/tests/test_poller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import time

from twisted.trial import unittest
from twisted.internet.defer import Deferred
Expand All @@ -8,6 +9,7 @@
from scrapyd.interfaces import IPoller
from scrapyd.config import Config
from scrapyd.poller import QueuePoller
from scrapyd.contrib.fix_poll_order.poller import FixQueuePoller
from scrapyd.utils import get_spider_queues

class QueuePollerTest(unittest.TestCase):
Expand Down Expand Up @@ -55,3 +57,61 @@ def test_poll_next(self):
self.poller.poll()
prj, spd = cfg.popitem()
self.failUnlessEqual(d2.result, {'_project': prj, '_spider': spd})


class FixQueuePollerTest(unittest.TestCase):

def setUp(self):
d = self.mktemp()
eggs_dir = os.path.join(d, 'eggs')
dbs_dir = os.path.join(d, 'dbs')
os.makedirs(eggs_dir)
os.makedirs(dbs_dir)
os.makedirs(os.path.join(eggs_dir, 'mybot1'))
os.makedirs(os.path.join(eggs_dir, 'mybot2'))
config = Config(values={'eggs_dir': eggs_dir, 'dbs_dir': dbs_dir,
'fix_poll_order': 'on'})
self.queues = get_spider_queues(config)
self.poller = FixQueuePoller(config)

def test_interface(self):
verifyObject(IPoller, self.poller)

def test_poll_next(self):
cfg = [('mybot2', 'spider2', 0), # second
('mybot1', 'spider2', 0.0), # third
('mybot1', 'spider1', -1), # fourth
('mybot1', 'spider3', 1.0)] # first
for prj, spd, priority in cfg:
self.queues[prj].add(spd, priority)
if prj == 'mybot2':
time.sleep(2) # ensure different timestamp

d1 = self.poller.next()
d2 = self.poller.next()
d3 = self.poller.next()
d4 = self.poller.next()
d5 = self.poller.next()
self.failUnless(isinstance(d1, Deferred))
self.failIf(hasattr(d1, 'result'))

# first poll
self.poller.poll()
self.failUnless(hasattr(d1, 'result') and getattr(d1, 'called', False))
self.assertEqual(d1.result, {'_project': 'mybot1', '_spider': 'spider3'})

# second poll
self.poller.poll()
self.assertEqual(d2.result, {'_project': 'mybot2', '_spider': 'spider2'})

# third poll
self.poller.poll()
self.assertEqual(d3.result, {'_project': 'mybot1', '_spider': 'spider2'})

# fourth poll
self.poller.poll()
self.assertEqual(d4.result, {'_project': 'mybot1', '_spider': 'spider1'})

# final poll
self.poller.poll()
self.failIf(hasattr(d5, 'result'))
6 changes: 5 additions & 1 deletion scrapyd/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from twisted.web import resource

from scrapyd.spiderqueue import SqliteSpiderQueue
from scrapyd.contrib.fix_poll_order.spiderqueue import FixSqliteSpiderQueue
from scrapyd.config import Config


Expand Down Expand Up @@ -59,7 +60,10 @@ def get_spider_queues(config):
d = {}
for project in get_project_list(config):
dbpath = os.path.join(dbsdir, '%s.db' % project)
d[project] = SqliteSpiderQueue(dbpath)
if config.getboolean('fix_poll_order', False):
d[project] = FixSqliteSpiderQueue(dbpath)
else:
d[project] = SqliteSpiderQueue(dbpath)
return d

def get_project_list(config):
Expand Down