-
Notifications
You must be signed in to change notification settings - Fork 0
/
tests.py
120 lines (80 loc) · 2.78 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import micropubsub as ps
def test_subscribe_and_publish():
# define a callback, aka a handler
def handleStartupEvent():
assert True
# subscribe the callback to the startup event
ps.subscribe('startup', handleStartupEvent)
# when the startup event is published, the callback will be executed
ps.publish('startup')
def test_subscribe_and_unsubscribe():
should_be_called = 1
called = 0
# define a callback, aka a handler
def handleEvent():
nonlocal called
called += 1
# subscribe the callback to the event
ps.subscribe('some:event', handleEvent)
# when the some:event event is published, the callback will be executed
ps.publish('some:event')
# unsubscribe the callback to the some:event event
ps.unsubscribe('some:event', handleEvent)
# when the some:event event is published, the callback will NOT be executed again
ps.publish('some:event')
assert should_be_called == called
def test_multiple_subscriptions():
should_be_called = 2
called = 0
# define a callback, aka a handler
def handleEvent():
nonlocal called
called += 1
# subscribe the callback to the event TWICE
ps.subscribe('some:event', handleEvent)
ps.subscribe('some:event', handleEvent)
# when the some:event event is published, the callback will be executed
ps.publish('some:event')
assert should_be_called == called
def test_multiple_subscriptions_and_unsubscribe():
should_be_called = 2
called = 0
# define a callback, aka a handler
def handleEvent():
nonlocal called
called += 1
# subscribe the callback to the event TWICE
ps.subscribe('some:event', handleEvent)
ps.subscribe('some:event', handleEvent)
# publishing should increment twice
ps.publish('some:event')
# should remove all subscriptions to some:event for handleEvent
ps.unsubscribe('some:event', handleEvent)
# when the some:event event is published, the callback will NOT be executed, since it's already unsubscribed
ps.publish('some:event')
assert should_be_called == called
def test_positional_args():
expected_total = 5
total = 0
# define a callback, aka a handler
def handleEvent(num_one, num_two):
nonlocal total
total = num_one + num_two
# subscribe the callback to the event TWICE
ps.subscribe('add_numbers', handleEvent)
# when the add_numbers event is published, the callback will be executed
ps.publish('add_numbers', 2, 3)
assert expected_total == total
def test_keyword_args():
expected_name = 'Jonny'
actual_name = ''
# define a callback, aka a handler
def handleEvent(name=None):
nonlocal actual_name
actual_name = name
# subscribe the callback to the event TWICE
ps.subscribe('name', handleEvent)
# when the add_numbers event is published, the callback will be executed
ps.publish('name', name='Jonny')
assert expected_name == actual_name
test_multiple_subscriptions_and_unsubscribe()