diff --git a/zengine/messaging/lib.py b/zengine/messaging/lib.py index 7a58a753..27057e77 100644 --- a/zengine/messaging/lib.py +++ b/zengine/messaging/lib.py @@ -74,7 +74,8 @@ def encrypt_password(self): if self.password and not self.password.startswith('$pbkdf2'): self.set_password(self.password) - def prepare_channels(self): + def prepare_user_channel(self): + """should be called from User.post_creation hook""" from zengine.messaging.model import Channel, Subscriber # create private channel of user ch, new = Channel.objects.get_or_create(owner=self, typ=5) @@ -117,12 +118,13 @@ def full_name(self): def prv_exchange(self): return 'prv_%s' % str(self.key).lower() - def bind_private_channel(self, sess_id): + def bind_channels_to_session_queue(self, sess_id): mq_channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS).channel() mq_channel.queue_declare(queue=sess_id, arguments={'x-expires': 40000}) log.debug("Binding private exchange to client queue: Q:%s --> E:%s" % (sess_id, self.prv_exchange)) - mq_channel.queue_bind(exchange=self.prv_exchange, queue=sess_id) + for sbs in self.subscriptions.objects.filter(): + mq_channel.queue_bind(exchange=sbs.channel.code_name, queue=sess_id) def send_notification(self, title, message, typ=1, url=None): """ diff --git a/zengine/messaging/model.py b/zengine/messaging/model.py index 394f12aa..52a0ef54 100644 --- a/zengine/messaging/model.py +++ b/zengine/messaging/model.py @@ -17,8 +17,9 @@ from pyoko.fields import DATE_TIME_FORMAT from pyoko.lib.utils import get_object_from_path from zengine.client_queue import BLOCKING_MQ_PARAMS +from zengine.lib.cache import UserSessionID from zengine.lib.utils import to_safe_str - +from zengine.log import log UserModel = get_object_from_path(settings.USER_MODEL) @@ -105,7 +106,7 @@ def add_message(cls, channel_key, body, title=None, sender=None, url=None, typ=2 msg_object = Message(sender=sender, body=body, msg_title=title, url=url, typ=typ, channel_id=channel_key, receiver=receiver, key=uuid4().hex) mq_channel.basic_publish(exchange=channel_key, - routing_key='', + routing_key='#', body=json.dumps(msg_object.serialize())) return msg_object.save() @@ -205,35 +206,29 @@ def unread_count(self): else: self.channel.message_set.objects.filter().count() - def create_exchange(self): - """ - Creates user's private exchange - - Actually user's private channel needed to be defined only once, - and this should be happened when user first created. - But since this has a little performance cost, - to be safe we always call it before binding to the channel we currently subscribe - """ - channel = self._connect_mq() - channel.exchange_declare(exchange='prv_%s' % self.user.key.lower(), - exchange_type='fanout', - durable=True) - @classmethod def mark_seen(cls, key, datetime_str): cls.objects.filter(key=key).update(last_seen=datetime_str) def bind_to_channel(self): """ - Binds (subscribes) users private exchange to channel exchange + Binds (subscribes) users session queue to channel exchange Automatically called at creation of subscription record. + Same operation done at user login + (zengine.messaging.lib.BaseUser#bind_channels_to_session_queue) """ - if self.channel.code_name != self.user.prv_exchange: - channel = self._connect_mq() - channel.exchange_bind(source=self.channel.code_name, destination=self.user.prv_exchange) + try: + sess_id = UserSessionID(self.user_id).get() + mq_channel = self._connect_mq() + mq_channel.queue_bind(exchange=self.channel.code_name, + queue=sess_id) + except: + log.exception("Cant create subscription binding for %s : %s" % (self.name, + self.user.full_name)) + def post_creation(self): - self.create_exchange() + # self.create_exchange() self.bind_to_channel() if not self.name: @@ -329,9 +324,9 @@ def _republish(self): mq_channel.basic_publish(exchange=self.channel.key, routing_key='', body=json.dumps(self.serialize())) - def pre_save(self): - if self.exist: - self._republish() + # def pre_save(self): + # if self.exist: + # self._republish() ATTACHMENT_TYPES = ( diff --git a/zengine/models/auth.py b/zengine/models/auth.py index 2df93ed7..79a06767 100644 --- a/zengine/models/auth.py +++ b/zengine/models/auth.py @@ -89,7 +89,7 @@ def pre_save(self): self.encrypt_password() def post_creation(self): - self.prepare_channels() + self.prepare_user_channel() def get_permissions(self): """ diff --git a/zengine/tornado_server/ws_to_queue.py b/zengine/tornado_server/ws_to_queue.py index 037333e3..8850c1d5 100644 --- a/zengine/tornado_server/ws_to_queue.py +++ b/zengine/tornado_server/ws_to_queue.py @@ -118,6 +118,7 @@ def send_message(self, sess_id, input_data): {'code': 503, 'error': 'Retry'}) + class QueueManager(object): """ Async RabbitMQ & Tornado websocket connector @@ -134,7 +135,7 @@ def __init__(self, io_loop=None): self.out_channels = {} self.out_channel = None self.websockets = {} - # self.connect() + def connect(self): """ @@ -187,12 +188,6 @@ def on_input_queue_declare(self, queue): exchange='input_exc', queue=self.INPUT_QUEUE_NAME, routing_key="#") - def ask_for_user_id(self, sess_id): - log.debug(sess_id) - # TODO: add remote ip - self.publish_incoming_message(dict(_zops_remote_ip='', - data={'view': 'sessid_to_userid'}), sess_id) - def register_websocket(self, sess_id, ws): """ @@ -201,15 +196,8 @@ def register_websocket(self, sess_id, ws): sess_id: ws: """ - log.debug("GET SESSUSERS: %s" % sys.sessid_to_userid) - try: - user_id = sys.sessid_to_userid[sess_id] - self.websockets[user_id] = ws - except KeyError: - self.ask_for_user_id(sess_id) - self.websockets[sess_id] = ws - user_id = sess_id - self.create_out_channel(sess_id, user_id) + self.websockets[sess_id] = ws + self.create_out_channel(sess_id) def inform_disconnection(self, sess_id): self.in_channel.basic_publish(exchange='input_exc', @@ -220,25 +208,22 @@ def inform_disconnection(self, sess_id): _zops_remote_ip=''))) def unregister_websocket(self, sess_id): - user_id = sys.sessid_to_userid.get(sess_id, None) try: self.inform_disconnection(sess_id) - del self.websockets[user_id] - except KeyError: - log.exception("Non-existent websocket for %s" % user_id) + del self.websockets[sess_id] + except: + log.exception("WS already deleted") if sess_id in self.out_channels: try: self.out_channels[sess_id].close() except ChannelClosed: log.exception("Pika client (out) channel already closed") - def create_out_channel(self, sess_id, user_id): + def create_out_channel(self, sess_id): def _on_output_channel_creation(channel): def _on_output_queue_decleration(queue): - channel.basic_consume(self.on_message, queue=sess_id) - log.debug("BIND QUEUE TO WS Q.%s on Ch.%s WS.%s" % (sess_id, - channel.consumer_tags[0], - user_id)) + channel.basic_consume(self.on_message, queue=sess_id, consumer_tag=sess_id) + log.debug("BIND QUEUE TO WS Q.%s " % sess_id) self.out_channels[sess_id] = channel channel.queue_declare(callback=_on_output_queue_decleration, @@ -263,18 +248,9 @@ def publish_incoming_message(self, message, sess_id): body=json_encode(message)) def on_message(self, channel, method, header, body): - user_id = method.exchange[4:] - log.debug("WS RPLY for %s: %s" % (user_id, body)) - if user_id in self.websockets: - log.info("write msg to client") - self.websockets[user_id].write_message(body) - # channel.basic_ack(delivery_tag=method.delivery_tag) - elif 'sessid_to_userid' in body: - reply = json_decode(body) - sys.sessid_to_userid[reply['sess_id']] = reply['user_id'] - self.websockets[reply['user_id']] = self.websockets[reply['sess_id']] - del self.websockets[reply['sess_id']] + sess_id = method.consumer_tag + log.debug("WS RPLY for %s: %s" % (sess_id, body)) + if sess_id in self.websockets: + log.info("write msg to client %s" % sess_id) + self.websockets[sess_id].write_message(body) channel.basic_ack(delivery_tag=method.delivery_tag) - - # else: - # channel.basic_reject(delivery_tag=method.delivery_tag) diff --git a/zengine/views/auth.py b/zengine/views/auth.py index d8571fdf..62be4f88 100644 --- a/zengine/views/auth.py +++ b/zengine/views/auth.py @@ -61,9 +61,8 @@ def _do_upgrade(self): self.current.output['cmd'] = 'upgrade' self.current.output['user_id'] = self.current.user_id self.current.user.is_online(True) - self.current.user.bind_private_channel(self.current.session.sess_id) - user_sess = UserSessionID(self.current.user_id) - user_sess.set(self.current.session.sess_id) + self.current.user.bind_channels_to_session_queue(self.current.session.sess_id) + UserSessionID(self.current.user_id).set(self.current.session.sess_id) def do_view(self): """ @@ -82,12 +81,6 @@ def do_view(self): self.current.task_data['login_successful'] = auth_result if auth_result: self._do_upgrade() - - # old_sess_id = user_sess.get() - # notify = Notify(self.current.user_id) - # notify.cache_to_queue() - # if old_sess_id: - # notify.old_to_new_queue(old_sess_id) except: raise self.current.log.exception("Wrong username or another error occurred") diff --git a/zengine/views/system.py b/zengine/views/system.py index a592faf2..a82ff941 100644 --- a/zengine/views/system.py +++ b/zengine/views/system.py @@ -11,7 +11,7 @@ def sessid_to_userid(current): current.output['user_id'] = current.user_id.lower() current.output['sess_id'] = current.session.sess_id - current.user.bind_private_channel(current.session.sess_id) + current.user.bind_channels_to_session_queue(current.session.sess_id) current.output['sessid_to_userid'] = True def mark_offline_user(current): diff --git a/zengine/wf_daemon.py b/zengine/wf_daemon.py index 4649ac15..0d6fdfd9 100755 --- a/zengine/wf_daemon.py +++ b/zengine/wf_daemon.py @@ -176,10 +176,10 @@ def handle_message(self, ch, method, properties, body): def send_output(self, output): # TODO: This is ugly, we should separate login process log.debug("SEND_OUTPUT: %s" % output) - if self.current.user_id is None or 'login_process' in output: - self.client_queue.send_to_default_exchange(self.sessid, output) - else: - self.client_queue.send_to_prv_exchange(self.current.user_id, output) + # if self.current.user_id is None or 'login_process' in output: + self.client_queue.send_to_default_exchange(self.sessid, output) + # else: + # self.client_queue.send_to_prv_exchange(self.current.user_id, output)