Skip to content

Commit

Permalink
rref #5367
Browse files Browse the repository at this point in the history
rref #5366
ref #66
ref #65
  • Loading branch information
evrenesat committed Jun 21, 2016
1 parent 287b722 commit 849249a
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 40 deletions.
109 changes: 69 additions & 40 deletions zengine/messaging/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#
# This file is licensed under the GNU General Public License v3
# (GPLv3). See LICENSE.txt for details.
import json

import pika

from pyoko import Model, field, ListNode
Expand All @@ -15,46 +17,23 @@

UserModel = get_object_from_path(settings.USER_MODEL)

MSG_TYPES = (
(1, "Info"),
(11, "Error"),
(111, "Success"),
(2, "Direct Message"),
(3, "Broadcast Message")
(4, "Channel Message")
)

def get_mq_connection():
connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
channel = connection.channel()
return connection, channel


CHANNEL_TYPES = (
# (1, "Notification"),
(10, "System Broadcast"),
(10, "User Broadcast"),
(15, "Direct"),
(15, "User Broadcast"),
(20, "Chat"),
(25, "Direct"),
)


MESSAGE_STATUS = (
(1, "Created"),
(11, "Transmitted"),
(22, "Seen"),
(33, "Read"),
(44, "Archived"),

)
ATTACHMENT_TYPES = (
(1, "Document"),
(11, "Spreadsheet"),
(22, "Image"),
(33, "PDF"),

)


def get_mq_connection():
connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
channel = connection.channel()
return connection, channel

class Channel(Model):
class Channel(Model):
name = field.String("Name")
code_name = field.String("Internal name")
description = field.String("Description")
Expand All @@ -64,18 +43,27 @@ class Channel(Model):
class Managers(ListNode):
user = UserModel(reverse_name='managed_channels')

def add_message(self, body, title, sender=None, url=None, typ=2):
channel = self._connect_mq()
mq_msg = json.dumps(dict(sender=sender, body=body, msg_title=title, url=url, typ=typ))
channel.basic_publish(exchange=self.code_name, body=mq_msg)
Message(sender=sender, body=body, msg_title=title, url=url, typ=typ, channel=self).save()

def _connect_mq(self):
self.connection, self.channel = get_mq_connection()
return self.channel

def create_exchange(self):
"""
This method creates MQ exch
which actually needed to be defined only once.
Creates MQ exchange for this channel
Needs to be defined only once.
"""
channel = self._connect_mq()
channel.exchange_declare(exchange=self.code_name)
channel.exchange_declare(exchange=self.code_name, exchange_type='fanout', durable=True)

def post_creation(self):
self.create_exchange()


class Subscription(Model):
"""
Expand All @@ -87,6 +75,7 @@ class Subscription(Model):
is_muted = field.Boolean("Mute the channel")
inform_me = field.Boolean("Inform when I'm mentioned")
can_leave = field.Boolean("Membership is not obligatory", default=True)

# status = field.Integer("Status", choices=SUBSCRIPTION_STATUS)

def _connect_mq(self):
Expand All @@ -95,18 +84,48 @@ def _connect_mq(self):

def create_exchange(self):
"""
This method creates user's private exchange
which actually needed to be defined only once.
Creates user's private exchange
Actually needed to be defined only once.
but since we don't know if it's exists or not
we always call it before
"""
channel = self._connect_mq()
channel.exchange_declare(exchange=self.user.key)

channel.exchange_declare(exchange=self.user.key, exchange_type='direct', durable=True)

def bind_to_channel(self):
"""
Binds (subscribes) users private exchange to channel exchange
Automatically called at creation of subscription record.
"""
channel = self._connect_mq()
channel.exchange_bind(source=self.channel.code_name, destination=self.user.key)

def post_creation(self):
self.create_exchange()
self.bind_to_channel()

def __unicode__(self):
return "%s in %s" % (self.user, self.channel.name)


MSG_TYPES = (
(1, "Info"),
(11, "Error"),
(111, "Success"),
(2, "Direct Message"),
(3, "Broadcast Message")
(4, "Channel Message")
)
MESSAGE_STATUS = (
(1, "Created"),
(11, "Transmitted"),
(22, "Seen"),
(33, "Read"),
(44, "Archived"),

)


class Message(Model):
"""
Permission model
Expand All @@ -119,13 +138,23 @@ class Message(Model):
url = field.String("URL")
channel = Channel()
sender = UserModel(reverse_name='sent_messages')
# FIXME: receiver should be removed after all of it's usages refactored to channels
receiver = UserModel(reverse_name='received_messages')

def __unicode__(self):
content = self.msg_title or self.body
return "%s%s" % (content[:30], '...' if len(content) > 30 else '')


ATTACHMENT_TYPES = (
(1, "Document"),
(11, "Spreadsheet"),
(22, "Image"),
(33, "PDF"),

)


class Attachment(Model):
"""
A model to store message attachments
Expand Down
13 changes: 13 additions & 0 deletions zengine/views/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,21 @@ class Login(SimpleView):
does the authentication at ``do`` stage.
"""

def _do_binding(self):
"""
Bind user's ephemeral session queue to user's durable private exchange
"""
from zengine.messaging.model import get_mq_connection
connection, channel = get_mq_connection()
channel.queue_bind(exchange=self.current.user_id,
queue=self.current.session.sess_id,
# routing_key="#"
)

def do_view(self):
"""
Authenticate user with given credentials.
Connects user's queue and exchange
"""
self.current.task_data['login_successful'] = False
if self.current.is_auth:
Expand All @@ -67,6 +79,7 @@ def do_view(self):
self.current.input['password'])
self.current.task_data['login_successful'] = auth_result
if auth_result:
self._do_binding()
user_sess = UserSessionID(self.current.user_id)
old_sess_id = user_sess.get()
user_sess.set(self.current.session.sess_id)
Expand Down

0 comments on commit 849249a

Please sign in to comment.