Package cherrypy :: Package test :: Module test_bus
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.test_bus

  1  import threading 
  2  import time 
  3  import unittest 
  4   
  5  import cherrypy 
  6  from cherrypy._cpcompat import get_daemon, set 
  7  from cherrypy.process import wspbus 
  8   
  9   
 10  msg = "Listener %d on channel %s: %s." 
 11   
 12   
13 -class PublishSubscribeTests(unittest.TestCase):
14
15 - def get_listener(self, channel, index):
16 def listener(arg=None): 17 self.responses.append(msg % (index, channel, arg))
18 return listener
19
20 - def test_builtin_channels(self):
21 b = wspbus.Bus() 22 23 self.responses, expected = [], [] 24 25 for channel in b.listeners: 26 for index, priority in enumerate([100, 50, 0, 51]): 27 b.subscribe(channel, self.get_listener(channel, index), priority) 28 29 for channel in b.listeners: 30 b.publish(channel) 31 expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)]) 32 b.publish(channel, arg=79347) 33 expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)]) 34 35 self.assertEqual(self.responses, expected)
36
37 - def test_custom_channels(self):
38 b = wspbus.Bus() 39 40 self.responses, expected = [], [] 41 42 custom_listeners = ('hugh', 'louis', 'dewey') 43 for channel in custom_listeners: 44 for index, priority in enumerate([None, 10, 60, 40]): 45 b.subscribe(channel, self.get_listener(channel, index), priority) 46 47 for channel in custom_listeners: 48 b.publish(channel, 'ah so') 49 expected.extend([msg % (i, channel, 'ah so') for i in (1, 3, 0, 2)]) 50 b.publish(channel) 51 expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)]) 52 53 self.assertEqual(self.responses, expected)
54
55 - def test_listener_errors(self):
56 b = wspbus.Bus() 57 58 self.responses, expected = [], [] 59 channels = [c for c in b.listeners if c != 'log'] 60 61 for channel in channels: 62 b.subscribe(channel, self.get_listener(channel, 1)) 63 # This will break since the lambda takes no args. 64 b.subscribe(channel, lambda: None, priority=20) 65 66 for channel in channels: 67 self.assertRaises(wspbus.ChannelFailures, b.publish, channel, 123) 68 expected.append(msg % (1, channel, 123)) 69 70 self.assertEqual(self.responses, expected)
71 72
73 -class BusMethodTests(unittest.TestCase):
74
75 - def log(self, bus):
76 self._log_entries = [] 77 def logit(msg, level): 78 self._log_entries.append(msg)
79 bus.subscribe('log', logit)
80
81 - def assertLog(self, entries):
82 self.assertEqual(self._log_entries, entries)
83
84 - def get_listener(self, channel, index):
85 def listener(arg=None): 86 self.responses.append(msg % (index, channel, arg))
87 return listener 88
89 - def test_start(self):
90 b = wspbus.Bus() 91 self.log(b) 92 93 self.responses = [] 94 num = 3 95 for index in range(num): 96 b.subscribe('start', self.get_listener('start', index)) 97 98 b.start() 99 try: 100 # The start method MUST call all 'start' listeners. 101 self.assertEqual(set(self.responses), 102 set([msg % (i, 'start', None) for i in range(num)])) 103 # The start method MUST move the state to STARTED 104 # (or EXITING, if errors occur) 105 self.assertEqual(b.state, b.states.STARTED) 106 # The start method MUST log its states. 107 self.assertLog(['Bus STARTING', 'Bus STARTED']) 108 finally: 109 # Exit so the atexit handler doesn't complain. 110 b.exit()
111
112 - def test_stop(self):
113 b = wspbus.Bus() 114 self.log(b) 115 116 self.responses = [] 117 num = 3 118 for index in range(num): 119 b.subscribe('stop', self.get_listener('stop', index)) 120 121 b.stop() 122 123 # The stop method MUST call all 'stop' listeners. 124 self.assertEqual(set(self.responses), 125 set([msg % (i, 'stop', None) for i in range(num)])) 126 # The stop method MUST move the state to STOPPED 127 self.assertEqual(b.state, b.states.STOPPED) 128 # The stop method MUST log its states. 129 self.assertLog(['Bus STOPPING', 'Bus STOPPED'])
130
131 - def test_graceful(self):
132 b = wspbus.Bus() 133 self.log(b) 134 135 self.responses = [] 136 num = 3 137 for index in range(num): 138 b.subscribe('graceful', self.get_listener('graceful', index)) 139 140 b.graceful() 141 142 # The graceful method MUST call all 'graceful' listeners. 143 self.assertEqual(set(self.responses), 144 set([msg % (i, 'graceful', None) for i in range(num)])) 145 # The graceful method MUST log its states. 146 self.assertLog(['Bus graceful'])
147
148 - def test_exit(self):
149 b = wspbus.Bus() 150 self.log(b) 151 152 self.responses = [] 153 num = 3 154 for index in range(num): 155 b.subscribe('stop', self.get_listener('stop', index)) 156 b.subscribe('exit', self.get_listener('exit', index)) 157 158 b.exit() 159 160 # The exit method MUST call all 'stop' listeners, 161 # and then all 'exit' listeners. 162 self.assertEqual(set(self.responses), 163 set([msg % (i, 'stop', None) for i in range(num)] + 164 [msg % (i, 'exit', None) for i in range(num)])) 165 # The exit method MUST move the state to EXITING 166 self.assertEqual(b.state, b.states.EXITING) 167 # The exit method MUST log its states. 168 self.assertLog(['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED'])
169
170 - def test_wait(self):
171 b = wspbus.Bus() 172 173 def f(method): 174 time.sleep(0.2) 175 getattr(b, method)()
176 177 for method, states in [('start', [b.states.STARTED]), 178 ('stop', [b.states.STOPPED]), 179 ('start', [b.states.STARTING, b.states.STARTED]), 180 ('exit', [b.states.EXITING]), 181 ]: 182 threading.Thread(target=f, args=(method,)).start() 183 b.wait(states) 184 185 # The wait method MUST wait for the given state(s). 186 if b.state not in states: 187 self.fail("State %r not in %r" % (b.state, states)) 188
189 - def test_block(self):
190 b = wspbus.Bus() 191 self.log(b) 192 193 def f(): 194 time.sleep(0.2) 195 b.exit()
196 def g(): 197 time.sleep(0.4) 198 threading.Thread(target=f).start() 199 threading.Thread(target=g).start() 200 threads = [t for t in threading.enumerate() if not get_daemon(t)] 201 self.assertEqual(len(threads), 3) 202 203 b.block() 204 205 # The block method MUST wait for the EXITING state. 206 self.assertEqual(b.state, b.states.EXITING) 207 # The block method MUST wait for ALL non-main, non-daemon threads to finish. 208 threads = [t for t in threading.enumerate() if not get_daemon(t)] 209 self.assertEqual(len(threads), 1) 210 # The last message will mention an indeterminable thread name; ignore it 211 self.assertEqual(self._log_entries[:-1], 212 ['Bus STOPPING', 'Bus STOPPED', 213 'Bus EXITING', 'Bus EXITED', 214 'Waiting for child threads to terminate...']) 215
216 - def test_start_with_callback(self):
217 b = wspbus.Bus() 218 self.log(b) 219 try: 220 events = [] 221 def f(*args, **kwargs): 222 events.append(("f", args, kwargs))
223 def g(): 224 events.append("g") 225 b.subscribe("start", g) 226 b.start_with_callback(f, (1, 3, 5), {"foo": "bar"}) 227 # Give wait() time to run f() 228 time.sleep(0.2) 229 230 # The callback method MUST wait for the STARTED state. 231 self.assertEqual(b.state, b.states.STARTED) 232 # The callback method MUST run after all start methods. 233 self.assertEqual(events, ["g", ("f", (1, 3, 5), {"foo": "bar"})]) 234 finally: 235 b.exit() 236
237 - def test_log(self):
238 b = wspbus.Bus() 239 self.log(b) 240 self.assertLog([]) 241 242 # Try a normal message. 243 expected = [] 244 for msg in ["O mah darlin'"] * 3 + ["Clementiiiiiiiine"]: 245 b.log(msg) 246 expected.append(msg) 247 self.assertLog(expected) 248 249 # Try an error message 250 try: 251 foo 252 except NameError: 253 b.log("You are lost and gone forever", traceback=True) 254 lastmsg = self._log_entries[-1] 255 if "Traceback" not in lastmsg or "NameError" not in lastmsg: 256 self.fail("Last log message %r did not contain " 257 "the expected traceback." % lastmsg) 258 else: 259 self.fail("NameError was not raised as expected.")
260 261 262 if __name__ == "__main__": 263 unittest.main() 264