-
Notifications
You must be signed in to change notification settings - Fork 37
/
queue_pmh.py
110 lines (92 loc) · 4.42 KB
/
queue_pmh.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import argparse
from time import sleep
from time import time
from sqlalchemy import text
from app import db
from app import logger
from pmh_record import PmhRecord
from queue_main import DbQueue
from util import run_sql
import endpoint # magic
import pub # magic
class DbQueuePmh(DbQueue):
def table_name(self, job_type):
table_name = "pmh_record"
return table_name
def process_name(self, job_type):
process_name = "run_pmh" # formation name is from Procfile
return process_name
def worker_run(self, **kwargs):
single_obj_id = kwargs.get("id", None)
chunk = kwargs.get("chunk", 10)
limit = kwargs.get("limit", 10)
queue_table = "pmh_record"
run_class = PmhRecord
run_method = "mint_pages"
if single_obj_id:
limit = 1
else:
if not limit:
limit = 1000
text_query_pattern = """WITH picked_from_queue AS (
SELECT *
FROM {queue_table}
WHERE started is null
-- ORDER BY rand
LIMIT {chunk}
FOR UPDATE SKIP LOCKED
)
UPDATE {queue_table} queue_rows_to_update
SET started=now()
FROM picked_from_queue
WHERE picked_from_queue.id = queue_rows_to_update.id
RETURNING picked_from_queue.*;"""
text_query = text_query_pattern.format(
limit=limit,
chunk=chunk,
queue_table=queue_table
)
index = 0
start_time = time()
while True:
new_loop_start_time = time()
if single_obj_id:
objects = [run_class.query.filter(run_class.id == single_obj_id).first()]
else:
# logger.info(u"looking for new jobs")
objects = run_class.query.from_statement(text(text_query)).execution_options(autocommit=True).all()
# logger.info(u"finished get-new-objects query in {} seconds".format(elapsed(new_loop_start_time)))
if not objects:
# logger.info(u"sleeping for 5 seconds, then going again")
sleep(5)
continue
object_ids = [obj.id for obj in objects]
self.update_fn(run_class, run_method, objects, index=index)
object_ids_str = ",".join(["'{}'".format(id.replace("'", "''")) for id in object_ids])
object_ids_str = object_ids_str.replace("%", "%%") #sql escaping
run_sql(db, "update {queue_table} set finished=now() where id in ({ids})".format(
queue_table=queue_table, ids=object_ids_str))
# finished is set in update_fn
index += 1
if single_obj_id:
return
else:
self.print_update(new_loop_start_time, chunk, limit, start_time, index)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run stuff.")
parser.add_argument('--id', nargs="?", type=str, help="id of the one thing you want to update (case sensitive)")
parser.add_argument('--doi', nargs="?", type=str, help="id of the one thing you want to update (case insensitive)")
parser.add_argument('--reset', default=False, action='store_true', help="do you want to just reset?")
parser.add_argument('--run', default=False, action='store_true', help="to run the queue")
parser.add_argument('--status', default=False, action='store_true', help="to logger.info(the status")
parser.add_argument('--dynos', default=None, type=int, help="scale to this many dynos")
parser.add_argument('--logs', default=False, action='store_true', help="logger.info(out logs")
parser.add_argument('--monitor', default=False, action='store_true', help="monitor till done, then turn off dynos")
parser.add_argument('--kick', default=False, action='store_true', help="put started but unfinished dois back to unstarted so they are retried")
parser.add_argument('--limit', "-l", nargs="?", type=int, help="how many jobs to do")
parser.add_argument('--chunk', "-ch", nargs="?", default=10, type=int, help="how many to take off db at once")
parsed_args = parser.parse_args()
job_type = "normal" #should be an object attribute
my_queue = DbQueuePmh()
my_queue.run_right_thing(parsed_args, job_type)
print("finished")