Skip to content

Commit

Permalink
rref #5367
Browse files Browse the repository at this point in the history
rref #5366

ref #66
ref #65
  • Loading branch information
evrenesat committed Jul 21, 2016
1 parent 4b67472 commit 5f34aa7
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 24 deletions.
5 changes: 4 additions & 1 deletion zengine/management_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ def create_user_channels(self):
sb, new = Subscriber.objects.get_or_create(channel=ch,
user=usr,
read_only=True,
name='Notifications')
name='Notifications',
can_manage=True,
can_leave=False
)
print("%s notify sub: %s" % ('created' if new else 'existing', ch.code_name))


Expand Down
48 changes: 29 additions & 19 deletions zengine/messaging/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ class Channel(Model):
owner = UserModel(reverse_name='created_channels', null=True)

def __unicode__(self):
return "%s (%s's %s channel)" % (self.name or '', self.owner.__unicode__(), self.get_typ_display())
return "%s (%s's %s channel)" % (self.name or '',
self.owner.full_name,
self.get_typ_display())

#
# class Managers(ListNode):
Expand All @@ -86,32 +88,33 @@ def get_or_create_direct_channel(cls, initiator_key, receiver_key):
code_name='%s_%s' % (receiver_key, initiator_key))
receiver_name = UserModel.objects.get(receiver_key).full_name
if existing:
return existing[0], receiver_name
channel = existing[0]
else:
channel_name = '%s_%s' % (initiator_key, receiver_key)
channel = cls(is_direct=True, code_name=channel_name, typ=10).save()
Subscriber(channel=channel,
user_id=initiator_key,
name=receiver_name).save()
Subscriber(channel=channel,
user_id=receiver_key,
name=UserModel.objects.get(initiator_key).full_name).save()
return channel, receiver_name
Subscriber.objects.get_or_create(channel=channel,
user_id=initiator_key,
name=receiver_name)
Subscriber.objects.get_or_create(channel=channel,
user_id=receiver_key,
name=UserModel.objects.get(initiator_key).full_name)
return channel, receiver_name

@classmethod
def add_message(cls, channel_key, body, title=None, sender=None, url=None, typ=2,
receiver=None):
mq_channel = cls._connect_mq()
msg_object = Message(sender=sender, body=body, msg_title=title, url=url,
typ=typ, channel_id=channel_key, receiver=receiver, key=uuid4().hex)
msg_object.setattr('unsaved', True)
mq_channel.basic_publish(exchange=channel_key,
routing_key='',
body=json.dumps(msg_object.serialize()))
return msg_object.save()

def get_last_messages(self):
# TODO: Try to refactor this with https://github.com/rabbitmq/rabbitmq-recent-history-exchange
return self.message_set.objects.filter()[:20]
return self.message_set.objects.filter().set_params(sort="timestamp asc")[:20]

@classmethod
def _connect_mq(cls):
Expand All @@ -129,6 +132,12 @@ def create_exchange(self):
exchange_type='fanout',
durable=True)

def subscribe_owner(self):
sbs, new = Subscriber.objects.get_or_create(user=self.owner,
channel=self,
can_manage=True,
can_leave=False)

def pre_creation(self):
if not self.code_name:
if self.name:
Expand All @@ -145,6 +154,7 @@ def pre_creation(self):

def post_save(self):
self.create_exchange()
# self.subscribe_owner()


class Subscriber(Model):
Expand All @@ -171,7 +181,7 @@ class Subscriber(Model):
# status = field.Integer("Status", choices=SUBSCRIPTION_STATUS)

def __unicode__(self):
return "%s >> %s" % (self.user.full_name, self.channel.__unicode__())
return "%s subscription of %s" % (self.name, self.user)

@classmethod
def _connect_mq(cls):
Expand All @@ -195,13 +205,14 @@ def get_actions(self):
def is_online(self):
# TODO: Cache this method
if self.channel.typ == 10:
return self.channel.subscriber_set.objects.exclude(user=self.user).get().user.is_online()

return self.channel.subscriber_set.objects.exclude(
user=self.user).get().user.is_online()

def unread_count(self):
# FIXME: track and return actual unread message count
if self.last_seen_msg_time:
return self.channel.message_set.objects.filter(timestamp__lt=self.last_seen_msg_time).count()
return self.channel.message_set.objects.filter(
timestamp__lt=self.last_seen_msg_time).count()
else:
self.channel.message_set.objects.filter().count()

Expand All @@ -215,7 +226,7 @@ def create_exchange(self):
to be safe we always call it before binding to the channel we currently subscribe
"""
channel = self._connect_mq()
channel.exchange_declare(exchange='prv_%s' % self.user.key.lower(),
channel.exchange_declare(exchange=self.user.prv_exchange,
exchange_type='fanout',
durable=True)

Expand All @@ -236,11 +247,11 @@ def post_creation(self):
self.create_exchange()
self.bind_to_channel()

def pre_creation(self):
if not self.name:
self.name = self.channel.name



MSG_TYPES = (
(1, "Info Notification"),
(11, "Error Notification"),
Expand Down Expand Up @@ -287,7 +298,6 @@ def get_actions_for(self, user):
('Edit', '_zops_edit_message')
])


def serialize(self, user=None):
"""
Serializes message for given user.
Expand All @@ -306,7 +316,7 @@ def serialize(self, user=None):
'type': self.typ,
'updated_at': self.updated_at,
'timestamp': self.timestamp.strftime(DATE_TIME_FORMAT),
'is_update': self.exist,
'is_update': hasattr(self, 'unsaved'),
'attachments': [attachment.serialize() for attachment in self.attachment_set],
'title': self.msg_title,
'sender_name': self.sender.full_name,
Expand All @@ -330,7 +340,7 @@ def _republish(self):
body=json.dumps(self.serialize()))

def pre_save(self):
if self.exist:
if not hasattr(self, 'unsaved'):
self._republish()


Expand Down
6 changes: 5 additions & 1 deletion zengine/messaging/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ def create_channel(current):
description=current.input['description'],
owner=current.user,
typ=15).save()
sbs, new = Subscriber.objects.get_or_create(user=channel.owner,
channel=channel,
can_manage=True,
can_leave=False)
current.output = {
'channel_key': channel.key,
'status': 'OK',
Expand Down Expand Up @@ -600,7 +604,7 @@ def pin_channel(current):
}
"""
try:
Subscriber(current).objects.get(user_id=current.user_id,
Subscriber(current).objects.filter(user_id=current.user_id,
channel_id=current.input['channel_key']).update(pinned=True)
current.output = {'status': 'OK', 'code': 200}
except ObjectDoesNotExist:
Expand Down
4 changes: 2 additions & 2 deletions zengine/tornado_server/ws_to_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def unregister_websocket(self, sess_id):
def create_out_channel(self, sess_id, user_id):
def _on_output_channel_creation(channel):
def _on_output_queue_decleration(queue):
channel.basic_consume(self.on_message, queue=sess_id)
channel.basic_consume(self.on_message, queue=sess_id, consumer_tag=user_id)
log.debug("BIND QUEUE TO WS Q.%s on Ch.%s WS.%s" % (sess_id,
channel.consumer_tags[0],
user_id))
Expand Down Expand Up @@ -263,7 +263,7 @@ def publish_incoming_message(self, message, sess_id):
body=json_encode(message))

def on_message(self, channel, method, header, body):
user_id = method.exchange[4:]
user_id = method.consumer_tag
log.debug("WS RPLY for %s: %s" % (user_id, body))
if user_id in self.websockets:
log.info("write msg to client")
Expand Down
2 changes: 1 addition & 1 deletion zengine/wf_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def handle_message(self, ch, method, properties, body):

def send_output(self, output):
# TODO: This is ugly, we should separate login process
log.debug("SEND_OUTPUT: %s" % output)
# log.debug("SEND_OUTPUT: %s" % output)
if self.current.user_id is None or 'login_process' in output:
self.client_queue.send_to_default_exchange(self.sessid, output)
else:
Expand Down

0 comments on commit 5f34aa7

Please sign in to comment.