From 8297e12b0497fd2a194cf6eb3a96cd60e5266548 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Evren=20Esat=20=C3=96zkan?= Date: Mon, 29 Aug 2016 02:57:34 +0300 Subject: [PATCH] db_based wf execution ok rref #5357 ref zetaops/zengine#64 --- zengine/current.py | 11 +-- zengine/engine.py | 67 +++++++++-------- zengine/models/workflow_manager.py | 115 ++++++++++++++++++++--------- zengine/settings.py | 3 + zengine/views/system.py | 32 ++++++++ zengine/wf_daemon.py | 13 +++- 6 files changed, 168 insertions(+), 73 deletions(-) diff --git a/zengine/current.py b/zengine/current.py index a9640c87..90a01d34 100644 --- a/zengine/current.py +++ b/zengine/current.py @@ -17,14 +17,15 @@ import lazy_object_proxy from SpiffWorkflow.specs import WorkflowSpec -from beaker.session import Session from pyoko.lib.utils import get_object_from_path, lazy_property from zengine import signals from zengine.client_queue import ClientQueue from zengine.config import settings +from zengine.lib.cache import Session from zengine.log import log from zengine.models import WFCache +from zengine.models import WFInstance DEFAULT_LANE_CHANGE_MSG = { 'title': settings.MESSAGES['lane_change_message_title'], @@ -45,7 +46,7 @@ class Current(object): def __init__(self, **kwargs): self.task_data = {'cmd': None} - self.session = {} + self.session = Session() self.headers = {} self.input = {} # when we want to use engine functions independently, self.output = {} # we need to create a fake current object @@ -188,13 +189,13 @@ def __init__(self, **kwargs): self.new_token = True # log.info("TOKEN NEW: %s " % self.token) - self.wfcache = WFCache(self.token, self.session.sess_id) - # log.debug("\n\nWF_CACHE: %s" % self.wfcache.get()) + self.wf_cache = WFCache(self) + self.wf_instance = lazy_object_proxy.Proxy(lambda: self.wf_cache.get_instance()) self.set_client_cmds() def get_wf_link(self): """ - Create a "in app" anchor for accessing this workflow instance. + Create an "in app" anchor for accessing this workflow instance. Returns: String. Anchor link. diff --git a/zengine/engine.py b/zengine/engine.py index 78dd180d..6eb19522 100644 --- a/zengine/engine.py +++ b/zengine/engine.py @@ -23,7 +23,9 @@ from SpiffWorkflow.bpmn.storage.CompactWorkflowSerializer import \ CompactWorkflowSerializer from SpiffWorkflow.specs import WorkflowSpec +from datetime import datetime +from pyoko.fields import DATE_TIME_FORMAT from pyoko.lib.utils import get_object_from_path from pyoko.model import super_context, model_registry from zengine.auth.permissions import PERM_REQ_TASK_TYPES @@ -63,7 +65,7 @@ def __init__(self): self.use_compact_serializer = True # self.current = None self.wf_activities = {} - self.wf_cache = {'in_external': False} + self.wf_state = {} self.workflow = BpmnWorkflow self.workflow_spec_cache = {} self.workflow_spec = WorkflowSpec() @@ -93,15 +95,15 @@ def save_workflow_to_cache(self, serialized_wf_instance): del task_data[k] if 'cmd' in task_data: del task_data['cmd'] - self.wf_cache.update({'wf_state': serialized_wf_instance, + self.wf_state.update({'step': serialized_wf_instance, 'data': task_data, - 'wf_name': self.current.workflow_name, + 'name': self.current.workflow_name, }) if self.current.lane_name: self.current.pool[self.current.lane_name] = self.current.role.key - self.wf_cache['pool'] = self.current.pool + self.wf_state['pool'] = self.current.pool self.current.log.debug("POOL Content before WF Save: %s" % self.current.pool) - self.current.wfcache.set(self.wf_cache) + self.current.wf_cache.save(self.wf_state) def get_pool_context(self): # TODO: Add in-process caching @@ -127,11 +129,11 @@ def load_workflow_from_cache(self): updates the self.current.task_data """ if not self.current.new_token: - self.wf_cache = self.current.wfcache.get(self.wf_cache) - self.current.task_data = self.wf_cache['data'] + self.wf_state = self.current.wf_cache.get(self.wf_state) + self.current.task_data = self.wf_state['data'] self.current.set_client_cmds() - self.current.pool = self.wf_cache['pool'] - return self.wf_cache['wf_state'] + self.current.pool = self.wf_state['pool'] + return self.wf_state['step'] def _load_workflow(self): # gets the serialized wf data from cache and deserializes it @@ -176,7 +178,7 @@ def load_or_create_workflow(self): Tries to load the previously serialized (and saved) workflow Creates a new one if it can't """ - self.workflow_spec = self.get_worfklow_spec + self.workflow_spec = self.get_worfklow_spec() return self._load_workflow() or self.create_workflow() # self.current.update(workflow=self.workflow) @@ -196,7 +198,6 @@ def find_workflow_path(self): log.error(err_msg) raise RuntimeError(err_msg) - @property def get_worfklow_spec(self): """ Generates and caches the workflow spec package from @@ -224,11 +225,12 @@ def _save_or_delete_workflow(self): """ if not self.current.task_type.startswith('Start'): if self.current.task_name.startswith('End') and not self.are_we_in_subprocess(): - self.current.wfcache.delete() + self.wf_state['finished'] = True + self.wf_state['finish_date'] = datetime.now().strftime( + settings.DATETIME_DEFAULT_FORMAT) self.current.log.info("Delete WFCache: %s %s" % (self.current.workflow_name, self.current.token)) - else: - self.save_workflow_to_cache(self.serialize_workflow()) + self.save_workflow_to_cache(self.serialize_workflow()) def start_engine(self, **kwargs): """ @@ -242,11 +244,13 @@ def start_engine(self, **kwargs): """ self.current = WFCurrent(**kwargs) + self.wf_state = {'in_external': False, 'finished': False} if not self.current.new_token: - self.wf_cache = self.current.wfcache.get(self.wf_cache) - self.current.workflow_name = self.wf_cache['wf_name'] + self.wf_state = self.current.wf_cache.get(self.wf_state) + self.current.workflow_name = self.wf_state['name'] self.check_for_authentication() self.check_for_permission() + self.workflow = self.load_or_create_workflow() log_msg = ("\n\n::::::::::: ENGINE STARTED :::::::::::\n" "\tWF: %s (Possible) TASK:%s\n" @@ -275,7 +279,7 @@ def generate_wf_state_log(self): output += "\nCURRENT:" output += "\n\tACTIVITY: %s" % self.current.activity output += "\n\tPOOL: %s" % self.current.pool - output += "\n\tIN EXTERNAL: %s" % self.wf_cache['in_external'] + output += "\n\tIN EXTERNAL: %s" % self.wf_state['in_external'] output += "\n\tLANE: %s" % self.current.lane_name output += "\n\tTOKEN: %s" % self.current.token sys._zops_wf_state_log = output @@ -285,15 +289,15 @@ def log_wf_state(self): log.debug(self.generate_wf_state_log() + "\n= = = = = =\n") def switch_from_external_to_main_wf(self): - if self.wf_cache['in_external'] and self.current.task_type == 'Simple' and self.current.task_name == 'End': - main_wf = self.wf_cache['main_wf'] + if self.wf_state['in_external'] and self.current.task_type == 'Simple' and self.current.task_name == 'End': + main_wf = self.wf_state['main_wf'] self.current.workflow_name = main_wf['wf_name'] - self.workflow_spec = self.get_worfklow_spec + self.workflow_spec = self.get_worfklow_spec() self.workflow = self.deserialize_workflow(main_wf['wf_state']) self.current.workflow = self.workflow - self.wf_cache['in_external'] = False - self.wf_cache['pool'] = main_wf['pool'] - self.current.pool = self.wf_cache['pool'] + self.wf_state['in_external'] = False + self.wf_state['pool'] = main_wf['pool'] + self.current.pool = self.wf_state['pool'] self.current.task_name = None self.current.task_type = None self.current.task = None @@ -303,23 +307,26 @@ def switch_to_external_wf(self): if (self.current.task_type == 'ServiceTask' and self.current.task.task_spec.type == 'external'): log.debug("Entering to EXTERNAL WF") - main_wf = self.wf_cache.copy() + main_wf = self.wf_state.copy() external_wf_name = self.current.task.task_spec.topic self.current.workflow_name = external_wf_name - self.workflow_spec = self.get_worfklow_spec + self.workflow_spec = self.get_worfklow_spec() self.workflow = self.create_workflow() self.current.workflow = self.workflow self.check_for_authentication() self.check_for_permission() - self.wf_cache = {'main_wf': main_wf, 'in_external': True} + self.wf_state = {'main_wf': main_wf, 'in_external': True} def _should_we_run(self): not_a_user_task = self.current.task_type != 'UserTask' - wf_not_finished = not (self.current.task_name == 'End' and self.current.task_type == 'Simple') - if not wf_not_finished and self.are_we_in_subprocess(): - wf_not_finished = True - return self.current.flow_enabled and not_a_user_task and wf_not_finished + wf_in_progress = not (self.current.task_name == 'End' and + self.current.task_type == 'Simple') + if wf_in_progress and self.wf_state['finished']: + wf_in_progress = False + if not wf_in_progress and self.are_we_in_subprocess(): + wf_in_progress = True + return self.current.flow_enabled and not_a_user_task and wf_in_progress def run(self): """ diff --git a/zengine/models/workflow_manager.py b/zengine/models/workflow_manager.py index b71ea931..bdab5e6a 100644 --- a/zengine/models/workflow_manager.py +++ b/zengine/models/workflow_manager.py @@ -8,11 +8,16 @@ # (GPLv3). See LICENSE.txt for details. import json +from datetime import datetime +from time import sleep + from pika.exceptions import ChannelClosed from pika.exceptions import ConnectionClosed from pyoko import Model, field, ListNode, LinkProxy from pyoko.conf import settings -from pyoko.lib.utils import get_object_from_path +from pyoko.exceptions import ObjectDoesNotExist +from pyoko.fields import DATE_TIME_FORMAT +from pyoko.lib.utils import get_object_from_path, lazy_property from SpiffWorkflow.bpmn.parser.util import full_attr, BPMN_MODEL_NS, ATTRIBUTE_NS from zengine.client_queue import get_mq_connection from zengine.lib.cache import Cache @@ -220,24 +225,36 @@ class WFInstance(Model): """ wf = BPMNWorkflow() task = Task() + name = field.String("WF Name") current_actor = RoleModel() wf_object = field.String("Subject ID") last_activation = field.DateTime("Last activation") finished = field.Boolean(default=False) started = field.Boolean(default=False) + in_external = field.Boolean(default=False) start_date = field.DateTime("Start time") finish_date = field.DateTime("Finish time") - state = field.String("Serialized state of WF") + step = field.String("Last executed WF Step") + data = field.String("Task Data") + pool = field.String("Pool Data") class Meta: verbose_name = "Workflow Instance" verbose_name_plural = "Workflows Instances" search_fields = ['name'] - list_fields = ['name', ] + list_fields = ['name', 'actor'] - class Pool(ListNode): - order = field.Integer("Lane order") - role = RoleModel() + def actor(self): + return self.current_actor.user.full_name if self.current_actor.exist else '-' + actor.title = 'Current Actor' + + # class Pool(ListNode): + # order = field.Integer("Lane order") + # role = RoleModel() + + def pre_save(self): + if not self.wf and self.name: + self.wf = BPMNWorkflow.objects.get(name=self.name) def __unicode__(self): return '%s instance (%s)' % (self.wf.name, self.key) @@ -257,7 +274,7 @@ def delete_other_invitations(self): """ When one person use an invitation, we should delete other invitations """ - # TODO: Signal logged-in users to remove the task from task list + # TODO: Signal logged-in users to remove the task from their task list self.objects.filter(instance=self.instance).exclude(key=self.key).delete() @@ -272,10 +289,15 @@ class WFCache(Cache): mq_channel = None mq_connection = None - def __init__(self, wf_token, sess_id): - self.db_key = wf_token - self.sess_id = sess_id - super(WFCache, self).__init__(wf_token) + def __init__(self, current): + try: + self.db_key = current.token + except AttributeError: + self.db_key = current.input['token'] + self.sess_id = current.session.sess_id + self.current = current + self.wf_state = {} + super(WFCache, self).__init__(self.db_key) @classmethod def _connect_mq(cls): @@ -283,31 +305,56 @@ def _connect_mq(cls): cls.mq_connection, cls.mq_channel = get_mq_connection() return cls.mq_channel - def write_to_db_through_mq(self, sess_id, val): - """ - write wf state to DB through MQ >> Worker >> _zops_sync_wf_cache - Args: - sess_id: users session id - """ - data = dict(exchange='input_exc', - routing_key=sess_id, - body=json.dumps({'data': { - 'view': '_zops_sync_wf_cache', - 'wf_state': val, - 'token': self.db_key}, - '_zops_remote_ip': ''})) + def publish(self, **data): + _data = {'exchange': 'input_exc', + # 'routing_key': self.sess_id, + 'routing_key': '', + 'body': json.dumps({ + 'data': data, + '_zops_remote_ip': ''})} try: - self.mq_channel.basic_publish(**data) - except (ConnectionClosed, ChannelClosed): - self._connect_mq().basic_publish(**data) + self.mq_channel.basic_publish(**_data) + except (AttributeError, ConnectionClosed, ChannelClosed): + self._connect_mq().basic_publish(**_data) def get_from_db(self): - return WFInstance.objects.get(self.db_key) + try: + data, key = WFInstance.objects.data().get(self.db_key) + return data + except ObjectDoesNotExist: + return None + + def get(self, default=None): + self.wf_state = super(WFCache, self).get() or self.get_from_db() or default + if 'finish_date' in self.wf_state: + try: + dt = datetime.strptime(self.wf_state['finish_date'], DATE_TIME_FORMAT) + self.wf_state['finish_date'] = dt.strftime(settings.DATETIME_DEFAULT_FORMAT) + except ValueError: + # FIXME: we should properly handle wfengine > db > wfengine format conversion + pass + return self.wf_state + + def get_instance(self): + data_from_cache = super(WFCache, self).get() + if data_from_cache: + wfi = WFInstance() + wfi._load_data(data_from_cache, from_db=True) + wfi.key = self.db_key + return wfi + else: + return WFInstance.objects.get(self.db_key) - def get(self): - return super(WFCache, self).get() or self.get_from_db() + def save(self, wf_state): + """ + write wf state to DB through MQ >> Worker >> _zops_sync_wf_cache - def set(self, val, lifetime=None): - super(WFCache, self).set(val, lifetime) - self.write_to_db_through_mq(self.sess_id, val) - return val + Args: + wf_state dict: wf state + """ + self.wf_state = wf_state + self.wf_state['role_id'] = self.current.role_id + self.set(self.wf_state) + if self.wf_state['name'] not in settings.EPHEMERAL_WORKFLOWS: + self.publish(view='_zops_sync_wf_cache', + token=self.db_key) diff --git a/zengine/settings.py b/zengine/settings.py index ebb163e5..7d8638f9 100644 --- a/zengine/settings.py +++ b/zengine/settings.py @@ -193,6 +193,9 @@ } +#: These WFs will not saved to DB +EPHEMERAL_WORKFLOWS = ['crud', 'login', 'logout', 'edit_catalog_data'] + #: A manager object for DB stored catalog data. CATALOG_DATA_MANAGER = 'zengine.lib.catalog_data.catalog_data_manager' diff --git a/zengine/views/system.py b/zengine/views/system.py index a592faf2..9ed12d8e 100644 --- a/zengine/views/system.py +++ b/zengine/views/system.py @@ -6,6 +6,10 @@ # # This file is licensed under the GNU General Public License v3 # (GPLv3). See LICENSE.txt for details. +import six +from pyoko.exceptions import ObjectDoesNotExist +from zengine.models import WFCache +from zengine.models import WFInstance def sessid_to_userid(current): @@ -16,3 +20,31 @@ def sessid_to_userid(current): def mark_offline_user(current): current.user.is_online(False) + + +def sync_wf_cache(current): + wf_cache = WFCache(current) + wf_state = wf_cache.get() + if 'role_id' in wf_state: + try: + wfi = WFInstance.objects.get(key=current.input['token']) + except ObjectDoesNotExist: + # just for backwards compatibility + wfi = WFInstance(key=current.input['token']) + wfi.step = wf_state['step'] + wfi.name = wf_state['name'] + wfi.pool = wf_state['pool'] + wfi.current_actor_id = wf_state['role_id'] + wfi.data = wf_state['data'] + if wf_state['finished']: + wfi.finished = True + wfi.finish_date = wf_state['finish_date'] + wf_cache.delete() + wfi.save() + else: + pass + # if cache already cleared, we have nothing to sync + # -1 means do not return anything to client + current.output = -1 + + diff --git a/zengine/wf_daemon.py b/zengine/wf_daemon.py index f4e72eb9..c45970e6 100755 --- a/zengine/wf_daemon.py +++ b/zengine/wf_daemon.py @@ -101,10 +101,6 @@ def _handle_ping_pong(self, data, session): return msg def _handle_view(self, session, data, headers): - # create Current object - self.current = Current(session=session, input=data) - self.current.headers = headers - # handle ping/pong/session expiration if data['view'] == 'ping': return self._handle_ping_pong(data, session) @@ -164,7 +160,14 @@ def handle_message(self, ch, method, properties, body): if 'wf' in data: output = self._handle_workflow(session, data, headers) else: + # create Current object + self.current = Current(session=session, input=data) + self.current.headers = headers + output = self._handle_view(session, data, headers) + if output == -1: + # -1 means we don't want to return anything to client + return except HTTPError as e: import sys if hasattr(sys, '_called_from_test'): @@ -172,6 +175,8 @@ def handle_message(self, ch, method, properties, body): output = {'cmd': 'error', 'error': self._prepare_error_msg(e.message), "code": e.code} log.exception("Http error occurred") except: + self.current = Current(session=session, input=data) + self.current.headers = headers import sys if hasattr(sys, '_called_from_test'): raise