Skip to content

Commit

Permalink
e2e-removal-refactor finished. instant messaging working
Browse files Browse the repository at this point in the history
rref #5367
rref #5366

ref #66
ref #65
  • Loading branch information
evrenesat committed Jul 21, 2016
1 parent 4b9e238 commit 82de442
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 81 deletions.
8 changes: 5 additions & 3 deletions zengine/messaging/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def encrypt_password(self):
if self.password and not self.password.startswith('$pbkdf2'):
self.set_password(self.password)

def prepare_channels(self):
def prepare_user_channel(self):
"""should be called from User.post_creation hook"""
from zengine.messaging.model import Channel, Subscriber
# create private channel of user
ch, new = Channel.objects.get_or_create(owner=self, typ=5)
Expand Down Expand Up @@ -117,12 +118,13 @@ def full_name(self):
def prv_exchange(self):
return 'prv_%s' % str(self.key).lower()

def bind_private_channel(self, sess_id):
def bind_channels_to_session_queue(self, sess_id):
mq_channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS).channel()
mq_channel.queue_declare(queue=sess_id, arguments={'x-expires': 40000})
log.debug("Binding private exchange to client queue: Q:%s --> E:%s" % (sess_id,
self.prv_exchange))
mq_channel.queue_bind(exchange=self.prv_exchange, queue=sess_id)
for sbs in self.subscriptions.objects.filter():
mq_channel.queue_bind(exchange=sbs.channel.code_name, queue=sess_id)

def send_notification(self, title, message, typ=1, url=None):
"""
Expand Down
43 changes: 19 additions & 24 deletions zengine/messaging/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from pyoko.fields import DATE_TIME_FORMAT
from pyoko.lib.utils import get_object_from_path
from zengine.client_queue import BLOCKING_MQ_PARAMS
from zengine.lib.cache import UserSessionID
from zengine.lib.utils import to_safe_str

from zengine.log import log
UserModel = get_object_from_path(settings.USER_MODEL)


Expand Down Expand Up @@ -105,7 +106,7 @@ def add_message(cls, channel_key, body, title=None, sender=None, url=None, typ=2
msg_object = Message(sender=sender, body=body, msg_title=title, url=url,
typ=typ, channel_id=channel_key, receiver=receiver, key=uuid4().hex)
mq_channel.basic_publish(exchange=channel_key,
routing_key='',
routing_key='#',
body=json.dumps(msg_object.serialize()))
return msg_object.save()

Expand Down Expand Up @@ -205,35 +206,29 @@ def unread_count(self):
else:
self.channel.message_set.objects.filter().count()

def create_exchange(self):
"""
Creates user's private exchange
Actually user's private channel needed to be defined only once,
and this should be happened when user first created.
But since this has a little performance cost,
to be safe we always call it before binding to the channel we currently subscribe
"""
channel = self._connect_mq()
channel.exchange_declare(exchange='prv_%s' % self.user.key.lower(),
exchange_type='fanout',
durable=True)

@classmethod
def mark_seen(cls, key, datetime_str):
cls.objects.filter(key=key).update(last_seen=datetime_str)

def bind_to_channel(self):
"""
Binds (subscribes) users private exchange to channel exchange
Binds (subscribes) users session queue to channel exchange
Automatically called at creation of subscription record.
Same operation done at user login
(zengine.messaging.lib.BaseUser#bind_channels_to_session_queue)
"""
if self.channel.code_name != self.user.prv_exchange:
channel = self._connect_mq()
channel.exchange_bind(source=self.channel.code_name, destination=self.user.prv_exchange)
try:
sess_id = UserSessionID(self.user_id).get()
mq_channel = self._connect_mq()
mq_channel.queue_bind(exchange=self.channel.code_name,
queue=sess_id)
except:
log.exception("Cant create subscription binding for %s : %s" % (self.name,
self.user.full_name))


def post_creation(self):
self.create_exchange()
# self.create_exchange()
self.bind_to_channel()

if not self.name:
Expand Down Expand Up @@ -329,9 +324,9 @@ def _republish(self):
mq_channel.basic_publish(exchange=self.channel.key, routing_key='',
body=json.dumps(self.serialize()))

def pre_save(self):
if self.exist:
self._republish()
# def pre_save(self):
# if self.exist:
# self._republish()


ATTACHMENT_TYPES = (
Expand Down
2 changes: 1 addition & 1 deletion zengine/models/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def pre_save(self):
self.encrypt_password()

def post_creation(self):
self.prepare_channels()
self.prepare_user_channel()

def get_permissions(self):
"""
Expand Down
54 changes: 15 additions & 39 deletions zengine/tornado_server/ws_to_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def send_message(self, sess_id, input_data):
{'code': 503, 'error': 'Retry'})



class QueueManager(object):
"""
Async RabbitMQ & Tornado websocket connector
Expand All @@ -134,7 +135,7 @@ def __init__(self, io_loop=None):
self.out_channels = {}
self.out_channel = None
self.websockets = {}
# self.connect()


def connect(self):
"""
Expand Down Expand Up @@ -187,12 +188,6 @@ def on_input_queue_declare(self, queue):
exchange='input_exc',
queue=self.INPUT_QUEUE_NAME,
routing_key="#")
def ask_for_user_id(self, sess_id):
log.debug(sess_id)
# TODO: add remote ip
self.publish_incoming_message(dict(_zops_remote_ip='',
data={'view': 'sessid_to_userid'}), sess_id)


def register_websocket(self, sess_id, ws):
"""
Expand All @@ -201,15 +196,8 @@ def register_websocket(self, sess_id, ws):
sess_id:
ws:
"""
log.debug("GET SESSUSERS: %s" % sys.sessid_to_userid)
try:
user_id = sys.sessid_to_userid[sess_id]
self.websockets[user_id] = ws
except KeyError:
self.ask_for_user_id(sess_id)
self.websockets[sess_id] = ws
user_id = sess_id
self.create_out_channel(sess_id, user_id)
self.websockets[sess_id] = ws
self.create_out_channel(sess_id)

def inform_disconnection(self, sess_id):
self.in_channel.basic_publish(exchange='input_exc',
Expand All @@ -220,25 +208,22 @@ def inform_disconnection(self, sess_id):
_zops_remote_ip='')))

def unregister_websocket(self, sess_id):
user_id = sys.sessid_to_userid.get(sess_id, None)
try:
self.inform_disconnection(sess_id)
del self.websockets[user_id]
except KeyError:
log.exception("Non-existent websocket for %s" % user_id)
del self.websockets[sess_id]
except:
log.exception("WS already deleted")
if sess_id in self.out_channels:
try:
self.out_channels[sess_id].close()
except ChannelClosed:
log.exception("Pika client (out) channel already closed")

def create_out_channel(self, sess_id, user_id):
def create_out_channel(self, sess_id):
def _on_output_channel_creation(channel):
def _on_output_queue_decleration(queue):
channel.basic_consume(self.on_message, queue=sess_id)
log.debug("BIND QUEUE TO WS Q.%s on Ch.%s WS.%s" % (sess_id,
channel.consumer_tags[0],
user_id))
channel.basic_consume(self.on_message, queue=sess_id, consumer_tag=sess_id)
log.debug("BIND QUEUE TO WS Q.%s " % sess_id)
self.out_channels[sess_id] = channel

channel.queue_declare(callback=_on_output_queue_decleration,
Expand All @@ -263,18 +248,9 @@ def publish_incoming_message(self, message, sess_id):
body=json_encode(message))

def on_message(self, channel, method, header, body):
user_id = method.exchange[4:]
log.debug("WS RPLY for %s: %s" % (user_id, body))
if user_id in self.websockets:
log.info("write msg to client")
self.websockets[user_id].write_message(body)
# channel.basic_ack(delivery_tag=method.delivery_tag)
elif 'sessid_to_userid' in body:
reply = json_decode(body)
sys.sessid_to_userid[reply['sess_id']] = reply['user_id']
self.websockets[reply['user_id']] = self.websockets[reply['sess_id']]
del self.websockets[reply['sess_id']]
sess_id = method.consumer_tag
log.debug("WS RPLY for %s: %s" % (sess_id, body))
if sess_id in self.websockets:
log.info("write msg to client %s" % sess_id)
self.websockets[sess_id].write_message(body)
channel.basic_ack(delivery_tag=method.delivery_tag)

# else:
# channel.basic_reject(delivery_tag=method.delivery_tag)
11 changes: 2 additions & 9 deletions zengine/views/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ def _do_upgrade(self):
self.current.output['cmd'] = 'upgrade'
self.current.output['user_id'] = self.current.user_id
self.current.user.is_online(True)
self.current.user.bind_private_channel(self.current.session.sess_id)
user_sess = UserSessionID(self.current.user_id)
user_sess.set(self.current.session.sess_id)
self.current.user.bind_channels_to_session_queue(self.current.session.sess_id)
UserSessionID(self.current.user_id).set(self.current.session.sess_id)

def do_view(self):
"""
Expand All @@ -82,12 +81,6 @@ def do_view(self):
self.current.task_data['login_successful'] = auth_result
if auth_result:
self._do_upgrade()

# old_sess_id = user_sess.get()
# notify = Notify(self.current.user_id)
# notify.cache_to_queue()
# if old_sess_id:
# notify.old_to_new_queue(old_sess_id)
except:
raise
self.current.log.exception("Wrong username or another error occurred")
Expand Down
2 changes: 1 addition & 1 deletion zengine/views/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
def sessid_to_userid(current):
current.output['user_id'] = current.user_id.lower()
current.output['sess_id'] = current.session.sess_id
current.user.bind_private_channel(current.session.sess_id)
current.user.bind_channels_to_session_queue(current.session.sess_id)
current.output['sessid_to_userid'] = True

def mark_offline_user(current):
Expand Down
8 changes: 4 additions & 4 deletions zengine/wf_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ def handle_message(self, ch, method, properties, body):
def send_output(self, output):
# TODO: This is ugly, we should separate login process
log.debug("SEND_OUTPUT: %s" % output)
if self.current.user_id is None or 'login_process' in output:
self.client_queue.send_to_default_exchange(self.sessid, output)
else:
self.client_queue.send_to_prv_exchange(self.current.user_id, output)
# if self.current.user_id is None or 'login_process' in output:
self.client_queue.send_to_default_exchange(self.sessid, output)
# else:
# self.client_queue.send_to_prv_exchange(self.current.user_id, output)



Expand Down

0 comments on commit 82de442

Please sign in to comment.