diff --git a/SpiderKeeper/app/proxy/spiderctrl.py b/SpiderKeeper/app/proxy/spiderctrl.py index 2e47cfab..762a870c 100644 --- a/SpiderKeeper/app/proxy/spiderctrl.py +++ b/SpiderKeeper/app/proxy/spiderctrl.py @@ -89,7 +89,7 @@ def delete_project(self, project): def get_spider_list(self, project): spider_instance_list = self.spider_service_instances[0].get_spider_list(project.project_name) for spider_instance in spider_instance_list: - spider_instance.project_id = project.id + spider_instance.project_id = project.project_id return spider_instance_list def get_daemon_status(self): @@ -120,15 +120,11 @@ def sync_job_status(self, project): def start_spider(self, job_instance): project = Project.find_project_by_id(job_instance.project_id) + # job_execution = JobExecution.find_job_by_service_id(job_instance.project_id) spider_name = job_instance.spider_name - #arguments = {} - #if job_instance.spider_arguments: - # arguments = dict(map(lambda x: x.split("="), job_instance.spider_arguments.split(","))) - from collections import defaultdict - arguments = defaultdict(list) + arguments = {} if job_instance.spider_arguments: - for k, v in list(map(lambda x: x.split('=', 1), job_instance.spider_arguments.split(','))): - arguments[k].append(v) + arguments = dict(map(lambda x: x.split("="), job_instance.spider_arguments.split(","))) threshold = 0 daemon_size = len(self.spider_service_instances) if job_instance.priority == JobPriority.HIGH: @@ -147,15 +143,24 @@ def start_spider(self, job_instance): for i in range(threshold): leaders.append(random.choice(candidates)) for leader in leaders: + # add more arguments to scrapyd to run a spider + arguments['project_id']= job_instance.project_id + arguments['project_name']= project.project_name + arguments['job_instance_id']= job_instance.job_instance_id + arguments['priority']= job_instance.priority + arguments['args']= job_instance.spider_arguments + arguments['execute_ip']= leader.server + arguments['create_time']= datetime.datetime.now() serviec_job_id = leader.start_spider(project.project_name, spider_name, arguments) job_execution = JobExecution() job_execution.project_id = job_instance.project_id job_execution.service_job_execution_id = serviec_job_id - job_execution.job_instance_id = job_instance.id - job_execution.create_time = datetime.datetime.now() + job_execution.job_instance_id = job_instance.job_instance_id + job_execution.create_time = arguments['create_time'] job_execution.running_on = leader.server db.session.add(job_execution) db.session.commit() + def cancel_spider(self, job_execution): job_instance = JobInstance.find_job_instance_by_id(job_execution.job_instance_id) diff --git a/SpiderKeeper/app/schedulers/common.py b/SpiderKeeper/app/schedulers/common.py index b4187ce0..a18654e6 100644 --- a/SpiderKeeper/app/schedulers/common.py +++ b/SpiderKeeper/app/schedulers/common.py @@ -22,7 +22,7 @@ def sync_spiders(): ''' for project in Project.query.all(): spider_instance_list = agent.get_spider_list(project) - SpiderInstance.update_spider_instances(project.id, spider_instance_list) + SpiderInstance.update_spider_instances(project.project_id, spider_instance_list) app.logger.debug('[sync_spiders]') @@ -36,7 +36,7 @@ def run_spider_job(job_instance_id): job_instance = JobInstance.find_job_instance_by_id(job_instance_id) agent.start_spider(job_instance) app.logger.info('[run_spider_job][project:%s][spider_name:%s][job_instance_id:%s]' % ( - job_instance.project_id, job_instance.spider_name, job_instance.id)) + job_instance.project_id, job_instance.spider_name, job_instance.job_instance_id)) except Exception as e: app.logger.error('[run_spider_job] ' + str(e)) @@ -47,32 +47,28 @@ def reload_runnable_spider_job_execution(): :return: ''' running_job_ids = set([job.id for job in scheduler.get_jobs()]) - # app.logger.debug('[running_job_ids] %s' % ','.join(running_job_ids)) + app.logger.debug('[running_job_ids] %s' % ','.join(running_job_ids)) available_job_ids = set() # add new job to schedule for job_instance in JobInstance.query.filter_by(enabled=0, run_type="periodic").all(): - job_id = "spider_job_%s:%s" % (job_instance.id, int(time.mktime(job_instance.date_modified.timetuple()))) + job_id = "spider_job_%s:%s" % (job_instance.job_instance_id, int(time.mktime(job_instance.date_modified.timetuple()))) available_job_ids.add(job_id) if job_id not in running_job_ids: - try: - scheduler.add_job(run_spider_job, - args=(job_instance.id,), - trigger='cron', - id=job_id, - minute=job_instance.cron_minutes, - hour=job_instance.cron_hour, - day=job_instance.cron_day_of_month, - day_of_week=job_instance.cron_day_of_week, - month=job_instance.cron_month, - second=0, - max_instances=999, - misfire_grace_time=60 * 60, - coalesce=True) - except Exception as e: - app.logger.error( - '[load_spider_job] failed {} {},may be cron expression format error '.format(job_id, str(e))) + scheduler.add_job(run_spider_job, + args=(job_instance.job_instance_id,), + trigger='cron', + id=job_id, + minute=job_instance.cron_minutes, + hour=job_instance.cron_hour, + day=job_instance.cron_day_of_month, + day_of_week=job_instance.cron_day_of_week, + month=job_instance.cron_month, + second=0, + max_instances=999, + misfire_grace_time=60 * 60, + coalesce=True) app.logger.info('[load_spider_job][project:%s][spider_name:%s][job_instance_id:%s][job_id:%s]' % ( - job_instance.project_id, job_instance.spider_name, job_instance.id, job_id)) + job_instance.project_id, job_instance.spider_name, job_instance.job_instance_id, job_id)) # remove invalid jobs for invalid_job_id in filter(lambda job_id: job_id.startswith("spider_job_"), running_job_ids.difference(available_job_ids)): diff --git a/SpiderKeeper/app/spider/controller.py b/SpiderKeeper/app/spider/controller.py index 296126b3..1837bdb7 100644 --- a/SpiderKeeper/app/spider/controller.py +++ b/SpiderKeeper/app/spider/controller.py @@ -450,7 +450,7 @@ def inject_project(): project_context['project_list'] = Project.query.all() if project_context['project_list'] and (not session.get('project_id')): project = Project.query.first() - session['project_id'] = project.id + session['project_id'] = project.project_id if session.get('project_id'): project_context['project'] = Project.find_project_by_id(session['project_id']) project_context['spider_list'] = [spider_instance.to_dict() for spider_instance in @@ -482,9 +482,9 @@ def timedelta(end_time, start_time): def readable_time(total_seconds): if not total_seconds: return '-' - if total_seconds < 60: + if total_seconds / 60 == 0: return '%s s' % total_seconds - if total_seconds < 3600: + if total_seconds / 3600 == 0: return '%s m' % int(total_seconds / 60) return '%s h %s m' % (int(total_seconds / 3600), int((total_seconds % 3600) / 60)) @@ -495,7 +495,7 @@ def readable_time(total_seconds): def index(): project = Project.query.first() if project: - return redirect("/project/%s/job/dashboard" % project.id, code=302) + return redirect("/project/%s/job/dashboard" % project.project_id, code=302) return redirect("/project/manage", code=302) @@ -512,7 +512,8 @@ def project_create(): project.project_name = project_name db.session.add(project) db.session.commit() - return redirect("/project/%s/spider/deploy" % project.id, code=302) + session['project_id'] = project.project_id + return redirect("/project/%s/spider/deploy" % project.project_id, code=302) @app.route("/project//delete") @@ -597,14 +598,14 @@ def job_log(project_id, job_exec_id): @app.route("/project//job//run") def job_run(project_id, job_instance_id): - job_instance = JobInstance.query.filter_by(project_id=project_id, id=job_instance_id).first() + job_instance = JobInstance.query.filter_by(project_id=project_id, job_instance_id=job_instance_id).first() agent.start_spider(job_instance) return redirect(request.referrer, code=302) @app.route("/project//job//remove") def job_remove(project_id, job_instance_id): - job_instance = JobInstance.query.filter_by(project_id=project_id, id=job_instance_id).first() + job_instance = JobInstance.query.filter_by(project_id=project_id, job_instance_id=job_instance_id).first() db.session.delete(job_instance) db.session.commit() return redirect(request.referrer, code=302) @@ -612,7 +613,7 @@ def job_remove(project_id, job_instance_id): @app.route("/project//job//switch") def job_switch(project_id, job_instance_id): - job_instance = JobInstance.query.filter_by(project_id=project_id, id=job_instance_id).first() + job_instance = JobInstance.query.filter_by(project_id=project_id, job_instance_id=job_instance_id).first() job_instance.enabled = -1 if job_instance.enabled == 0 else 0 db.session.commit() return redirect(request.referrer, code=302) @@ -649,6 +650,8 @@ def spider_egg_upload(project_id): file.save(dst) agent.deploy(project, dst) flash('deploy success!') + from SpiderKeeper.app.schedulers.common import sync_spiders + sync_spiders() return redirect(request.referrer) diff --git a/SpiderKeeper/app/spider/model.py b/SpiderKeeper/app/spider/model.py index 5376602b..9401df64 100644 --- a/SpiderKeeper/app/spider/model.py +++ b/SpiderKeeper/app/spider/model.py @@ -1,12 +1,16 @@ import datetime from sqlalchemy import desc from SpiderKeeper.app import db, Base - +import uuid class Project(Base): __tablename__ = 'sk_project' project_name = db.Column(db.String(50)) + project_id = db.Column(db.String(16), nullable=False, index=True) + + def __init__(self): + self.project_id = str(uuid.uuid4()).replace('-','')[:16] @classmethod def load_project(cls, project_list): @@ -18,11 +22,11 @@ def load_project(cls, project_list): @classmethod def find_project_by_id(cls, project_id): - return Project.query.filter_by(id=project_id).first() + return Project.query.filter_by(project_id=project_id).first() def to_dict(self): return { - "project_id": self.id, + "project_id": self.project_id, "project_name": self.project_name } @@ -31,7 +35,7 @@ class SpiderInstance(Base): __tablename__ = 'sk_spider' spider_name = db.Column(db.String(100)) - project_id = db.Column(db.INTEGER, nullable=False, index=True) + project_id = db.Column(db.String(16), nullable=False, index=True) @classmethod def update_spider_instances(cls, project_id, spider_instance_list): @@ -103,7 +107,8 @@ class JobInstance(Base): __tablename__ = 'sk_job_instance' spider_name = db.Column(db.String(100), nullable=False, index=True) - project_id = db.Column(db.INTEGER, nullable=False, index=True) + project_id = db.Column(db.String(16), nullable=False, index=True) + job_instance_id = db.Column(db.String(16), nullable=False, index=True) tags = db.Column(db.Text) # job tag(split by , ) spider_arguments = db.Column(db.Text) # job execute arguments(split by , ex.: arg1=foo,arg2=bar) priority = db.Column(db.INTEGER) @@ -116,9 +121,12 @@ class JobInstance(Base): enabled = db.Column(db.INTEGER, default=0) # 0/-1 run_type = db.Column(db.String(20)) # periodic/onetime + def __init__(self): + self.job_instance_id = str(uuid.uuid4()).replace('-','')[:16] + def to_dict(self): return dict( - job_instance_id=self.id, + job_instance_id=self.job_instance_id, spider_name=self.spider_name, tags=self.tags.split(',') if self.tags else None, spider_arguments=self.spider_arguments, @@ -140,7 +148,7 @@ def list_job_instance_by_project_id(cls, project_id): @classmethod def find_job_instance_by_id(cls, job_instance_id): - return cls.query.filter_by(id=job_instance_id).first() + return cls.query.filter_by(job_instance_id=job_instance_id).first() class SpiderStatus(): @@ -149,18 +157,19 @@ class SpiderStatus(): class JobExecution(Base): __tablename__ = 'sk_job_execution' - - project_id = db.Column(db.INTEGER, nullable=False, index=True) + project_id = db.Column(db.String(16), nullable=False, index=True) + job_instance_id = db.Column(db.String(16), nullable=False, index=True) service_job_execution_id = db.Column(db.String(50), nullable=False, index=True) - job_instance_id = db.Column(db.INTEGER, nullable=False, index=True) create_time = db.Column(db.DATETIME) start_time = db.Column(db.DATETIME) end_time = db.Column(db.DATETIME) running_status = db.Column(db.INTEGER, default=SpiderStatus.PENDING) running_on = db.Column(db.Text) + + def to_dict(self): - job_instance = JobInstance.query.filter_by(id=self.job_instance_id).first() + job_instance = JobInstance.query.filter_by(job_instance_id=self.job_instance_id).first() return { 'project_id': self.project_id, 'job_execution_id': self.id, diff --git a/SpiderKeeper/app/templates/base.html b/SpiderKeeper/app/templates/base.html index 92b1c1f6..6480d3c2 100644 --- a/SpiderKeeper/app/templates/base.html +++ b/SpiderKeeper/app/templates/base.html @@ -58,7 +58,7 @@