Skip to content

Commit

Permalink
db_based wf execution ok
Browse files Browse the repository at this point in the history
rref #5357
ref #64
  • Loading branch information
evrenesat committed Aug 28, 2016
1 parent cdb2b31 commit 8297e12
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 73 deletions.
11 changes: 6 additions & 5 deletions zengine/current.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

import lazy_object_proxy
from SpiffWorkflow.specs import WorkflowSpec
from beaker.session import Session

from pyoko.lib.utils import get_object_from_path, lazy_property
from zengine import signals
from zengine.client_queue import ClientQueue
from zengine.config import settings
from zengine.lib.cache import Session
from zengine.log import log
from zengine.models import WFCache
from zengine.models import WFInstance

DEFAULT_LANE_CHANGE_MSG = {
'title': settings.MESSAGES['lane_change_message_title'],
Expand All @@ -45,7 +46,7 @@ class Current(object):
def __init__(self, **kwargs):

self.task_data = {'cmd': None}
self.session = {}
self.session = Session()
self.headers = {}
self.input = {} # when we want to use engine functions independently,
self.output = {} # we need to create a fake current object
Expand Down Expand Up @@ -188,13 +189,13 @@ def __init__(self, **kwargs):
self.new_token = True
# log.info("TOKEN NEW: %s " % self.token)

self.wfcache = WFCache(self.token, self.session.sess_id)
# log.debug("\n\nWF_CACHE: %s" % self.wfcache.get())
self.wf_cache = WFCache(self)
self.wf_instance = lazy_object_proxy.Proxy(lambda: self.wf_cache.get_instance())
self.set_client_cmds()

def get_wf_link(self):
"""
Create a "in app" anchor for accessing this workflow instance.
Create an "in app" anchor for accessing this workflow instance.
Returns: String. Anchor link.
Expand Down
67 changes: 37 additions & 30 deletions zengine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
from SpiffWorkflow.bpmn.storage.CompactWorkflowSerializer import \
CompactWorkflowSerializer
from SpiffWorkflow.specs import WorkflowSpec
from datetime import datetime

from pyoko.fields import DATE_TIME_FORMAT
from pyoko.lib.utils import get_object_from_path
from pyoko.model import super_context, model_registry
from zengine.auth.permissions import PERM_REQ_TASK_TYPES
Expand Down Expand Up @@ -63,7 +65,7 @@ def __init__(self):
self.use_compact_serializer = True
# self.current = None
self.wf_activities = {}
self.wf_cache = {'in_external': False}
self.wf_state = {}
self.workflow = BpmnWorkflow
self.workflow_spec_cache = {}
self.workflow_spec = WorkflowSpec()
Expand Down Expand Up @@ -93,15 +95,15 @@ def save_workflow_to_cache(self, serialized_wf_instance):
del task_data[k]
if 'cmd' in task_data:
del task_data['cmd']
self.wf_cache.update({'wf_state': serialized_wf_instance,
self.wf_state.update({'step': serialized_wf_instance,
'data': task_data,
'wf_name': self.current.workflow_name,
'name': self.current.workflow_name,
})
if self.current.lane_name:
self.current.pool[self.current.lane_name] = self.current.role.key
self.wf_cache['pool'] = self.current.pool
self.wf_state['pool'] = self.current.pool
self.current.log.debug("POOL Content before WF Save: %s" % self.current.pool)
self.current.wfcache.set(self.wf_cache)
self.current.wf_cache.save(self.wf_state)

def get_pool_context(self):
# TODO: Add in-process caching
Expand All @@ -127,11 +129,11 @@ def load_workflow_from_cache(self):
updates the self.current.task_data
"""
if not self.current.new_token:
self.wf_cache = self.current.wfcache.get(self.wf_cache)
self.current.task_data = self.wf_cache['data']
self.wf_state = self.current.wf_cache.get(self.wf_state)
self.current.task_data = self.wf_state['data']
self.current.set_client_cmds()
self.current.pool = self.wf_cache['pool']
return self.wf_cache['wf_state']
self.current.pool = self.wf_state['pool']
return self.wf_state['step']

def _load_workflow(self):
# gets the serialized wf data from cache and deserializes it
Expand Down Expand Up @@ -176,7 +178,7 @@ def load_or_create_workflow(self):
Tries to load the previously serialized (and saved) workflow
Creates a new one if it can't
"""
self.workflow_spec = self.get_worfklow_spec
self.workflow_spec = self.get_worfklow_spec()
return self._load_workflow() or self.create_workflow()
# self.current.update(workflow=self.workflow)

Expand All @@ -196,7 +198,6 @@ def find_workflow_path(self):
log.error(err_msg)
raise RuntimeError(err_msg)

@property
def get_worfklow_spec(self):
"""
Generates and caches the workflow spec package from
Expand Down Expand Up @@ -224,11 +225,12 @@ def _save_or_delete_workflow(self):
"""
if not self.current.task_type.startswith('Start'):
if self.current.task_name.startswith('End') and not self.are_we_in_subprocess():
self.current.wfcache.delete()
self.wf_state['finished'] = True
self.wf_state['finish_date'] = datetime.now().strftime(
settings.DATETIME_DEFAULT_FORMAT)
self.current.log.info("Delete WFCache: %s %s" % (self.current.workflow_name,
self.current.token))
else:
self.save_workflow_to_cache(self.serialize_workflow())
self.save_workflow_to_cache(self.serialize_workflow())

def start_engine(self, **kwargs):
"""
Expand All @@ -242,11 +244,13 @@ def start_engine(self, **kwargs):
"""
self.current = WFCurrent(**kwargs)
self.wf_state = {'in_external': False, 'finished': False}
if not self.current.new_token:
self.wf_cache = self.current.wfcache.get(self.wf_cache)
self.current.workflow_name = self.wf_cache['wf_name']
self.wf_state = self.current.wf_cache.get(self.wf_state)
self.current.workflow_name = self.wf_state['name']
self.check_for_authentication()
self.check_for_permission()

self.workflow = self.load_or_create_workflow()
log_msg = ("\n\n::::::::::: ENGINE STARTED :::::::::::\n"
"\tWF: %s (Possible) TASK:%s\n"
Expand Down Expand Up @@ -275,7 +279,7 @@ def generate_wf_state_log(self):
output += "\nCURRENT:"
output += "\n\tACTIVITY: %s" % self.current.activity
output += "\n\tPOOL: %s" % self.current.pool
output += "\n\tIN EXTERNAL: %s" % self.wf_cache['in_external']
output += "\n\tIN EXTERNAL: %s" % self.wf_state['in_external']
output += "\n\tLANE: %s" % self.current.lane_name
output += "\n\tTOKEN: %s" % self.current.token
sys._zops_wf_state_log = output
Expand All @@ -285,15 +289,15 @@ def log_wf_state(self):
log.debug(self.generate_wf_state_log() + "\n= = = = = =\n")

def switch_from_external_to_main_wf(self):
if self.wf_cache['in_external'] and self.current.task_type == 'Simple' and self.current.task_name == 'End':
main_wf = self.wf_cache['main_wf']
if self.wf_state['in_external'] and self.current.task_type == 'Simple' and self.current.task_name == 'End':
main_wf = self.wf_state['main_wf']
self.current.workflow_name = main_wf['wf_name']
self.workflow_spec = self.get_worfklow_spec
self.workflow_spec = self.get_worfklow_spec()
self.workflow = self.deserialize_workflow(main_wf['wf_state'])
self.current.workflow = self.workflow
self.wf_cache['in_external'] = False
self.wf_cache['pool'] = main_wf['pool']
self.current.pool = self.wf_cache['pool']
self.wf_state['in_external'] = False
self.wf_state['pool'] = main_wf['pool']
self.current.pool = self.wf_state['pool']
self.current.task_name = None
self.current.task_type = None
self.current.task = None
Expand All @@ -303,23 +307,26 @@ def switch_to_external_wf(self):
if (self.current.task_type == 'ServiceTask' and
self.current.task.task_spec.type == 'external'):
log.debug("Entering to EXTERNAL WF")
main_wf = self.wf_cache.copy()
main_wf = self.wf_state.copy()
external_wf_name = self.current.task.task_spec.topic
self.current.workflow_name = external_wf_name
self.workflow_spec = self.get_worfklow_spec
self.workflow_spec = self.get_worfklow_spec()
self.workflow = self.create_workflow()
self.current.workflow = self.workflow
self.check_for_authentication()
self.check_for_permission()

self.wf_cache = {'main_wf': main_wf, 'in_external': True}
self.wf_state = {'main_wf': main_wf, 'in_external': True}

def _should_we_run(self):
not_a_user_task = self.current.task_type != 'UserTask'
wf_not_finished = not (self.current.task_name == 'End' and self.current.task_type == 'Simple')
if not wf_not_finished and self.are_we_in_subprocess():
wf_not_finished = True
return self.current.flow_enabled and not_a_user_task and wf_not_finished
wf_in_progress = not (self.current.task_name == 'End' and
self.current.task_type == 'Simple')
if wf_in_progress and self.wf_state['finished']:
wf_in_progress = False
if not wf_in_progress and self.are_we_in_subprocess():
wf_in_progress = True
return self.current.flow_enabled and not_a_user_task and wf_in_progress

def run(self):
"""
Expand Down
115 changes: 81 additions & 34 deletions zengine/models/workflow_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
# (GPLv3). See LICENSE.txt for details.
import json

from datetime import datetime
from time import sleep

from pika.exceptions import ChannelClosed
from pika.exceptions import ConnectionClosed
from pyoko import Model, field, ListNode, LinkProxy
from pyoko.conf import settings
from pyoko.lib.utils import get_object_from_path
from pyoko.exceptions import ObjectDoesNotExist
from pyoko.fields import DATE_TIME_FORMAT
from pyoko.lib.utils import get_object_from_path, lazy_property
from SpiffWorkflow.bpmn.parser.util import full_attr, BPMN_MODEL_NS, ATTRIBUTE_NS
from zengine.client_queue import get_mq_connection
from zengine.lib.cache import Cache
Expand Down Expand Up @@ -220,24 +225,36 @@ class WFInstance(Model):
"""
wf = BPMNWorkflow()
task = Task()
name = field.String("WF Name")
current_actor = RoleModel()
wf_object = field.String("Subject ID")
last_activation = field.DateTime("Last activation")
finished = field.Boolean(default=False)
started = field.Boolean(default=False)
in_external = field.Boolean(default=False)
start_date = field.DateTime("Start time")
finish_date = field.DateTime("Finish time")
state = field.String("Serialized state of WF")
step = field.String("Last executed WF Step")
data = field.String("Task Data")
pool = field.String("Pool Data")

class Meta:
verbose_name = "Workflow Instance"
verbose_name_plural = "Workflows Instances"
search_fields = ['name']
list_fields = ['name', ]
list_fields = ['name', 'actor']

class Pool(ListNode):
order = field.Integer("Lane order")
role = RoleModel()
def actor(self):
return self.current_actor.user.full_name if self.current_actor.exist else '-'
actor.title = 'Current Actor'

# class Pool(ListNode):
# order = field.Integer("Lane order")
# role = RoleModel()

def pre_save(self):
if not self.wf and self.name:
self.wf = BPMNWorkflow.objects.get(name=self.name)

def __unicode__(self):
return '%s instance (%s)' % (self.wf.name, self.key)
Expand All @@ -257,7 +274,7 @@ def delete_other_invitations(self):
"""
When one person use an invitation, we should delete other invitations
"""
# TODO: Signal logged-in users to remove the task from task list
# TODO: Signal logged-in users to remove the task from their task list
self.objects.filter(instance=self.instance).exclude(key=self.key).delete()


Expand All @@ -272,42 +289,72 @@ class WFCache(Cache):
mq_channel = None
mq_connection = None

def __init__(self, wf_token, sess_id):
self.db_key = wf_token
self.sess_id = sess_id
super(WFCache, self).__init__(wf_token)
def __init__(self, current):
try:
self.db_key = current.token
except AttributeError:
self.db_key = current.input['token']
self.sess_id = current.session.sess_id
self.current = current
self.wf_state = {}
super(WFCache, self).__init__(self.db_key)

@classmethod
def _connect_mq(cls):
if cls.mq_connection is None or cls.mq_connection.is_closed:
cls.mq_connection, cls.mq_channel = get_mq_connection()
return cls.mq_channel

def write_to_db_through_mq(self, sess_id, val):
"""
write wf state to DB through MQ >> Worker >> _zops_sync_wf_cache
Args:
sess_id: users session id
"""
data = dict(exchange='input_exc',
routing_key=sess_id,
body=json.dumps({'data': {
'view': '_zops_sync_wf_cache',
'wf_state': val,
'token': self.db_key},
'_zops_remote_ip': ''}))
def publish(self, **data):
_data = {'exchange': 'input_exc',
# 'routing_key': self.sess_id,
'routing_key': '',
'body': json.dumps({
'data': data,
'_zops_remote_ip': ''})}
try:
self.mq_channel.basic_publish(**data)
except (ConnectionClosed, ChannelClosed):
self._connect_mq().basic_publish(**data)
self.mq_channel.basic_publish(**_data)
except (AttributeError, ConnectionClosed, ChannelClosed):
self._connect_mq().basic_publish(**_data)

def get_from_db(self):
return WFInstance.objects.get(self.db_key)
try:
data, key = WFInstance.objects.data().get(self.db_key)
return data
except ObjectDoesNotExist:
return None

def get(self, default=None):
self.wf_state = super(WFCache, self).get() or self.get_from_db() or default
if 'finish_date' in self.wf_state:
try:
dt = datetime.strptime(self.wf_state['finish_date'], DATE_TIME_FORMAT)
self.wf_state['finish_date'] = dt.strftime(settings.DATETIME_DEFAULT_FORMAT)
except ValueError:
# FIXME: we should properly handle wfengine > db > wfengine format conversion
pass
return self.wf_state

def get_instance(self):
data_from_cache = super(WFCache, self).get()
if data_from_cache:
wfi = WFInstance()
wfi._load_data(data_from_cache, from_db=True)
wfi.key = self.db_key
return wfi
else:
return WFInstance.objects.get(self.db_key)

def get(self):
return super(WFCache, self).get() or self.get_from_db()
def save(self, wf_state):
"""
write wf state to DB through MQ >> Worker >> _zops_sync_wf_cache
def set(self, val, lifetime=None):
super(WFCache, self).set(val, lifetime)
self.write_to_db_through_mq(self.sess_id, val)
return val
Args:
wf_state dict: wf state
"""
self.wf_state = wf_state
self.wf_state['role_id'] = self.current.role_id
self.set(self.wf_state)
if self.wf_state['name'] not in settings.EPHEMERAL_WORKFLOWS:
self.publish(view='_zops_sync_wf_cache',
token=self.db_key)
3 changes: 3 additions & 0 deletions zengine/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@

}

#: These WFs will not saved to DB
EPHEMERAL_WORKFLOWS = ['crud', 'login', 'logout', 'edit_catalog_data']

#: A manager object for DB stored catalog data.
CATALOG_DATA_MANAGER = 'zengine.lib.catalog_data.catalog_data_manager'

Expand Down
Loading

0 comments on commit 8297e12

Please sign in to comment.