1 """Extensions to unittest for web frameworks.
2
3 Use the WebCase.getPage method to request a page from your HTTP server.
4
5 Framework Integration
6 =====================
7
8 If you have control over your server process, you can handle errors
9 in the server-side of the HTTP conversation a bit better. You must run
10 both the client (your WebCase tests) and the server in the same process
11 (but in separate threads, obviously).
12
13 When an error occurs in the framework, call server_error. It will print
14 the traceback to stdout, and keep any assertions you have from running
15 (the assumption is that, if the server errors, the page output will not
16 be of further significance to your tests).
17 """
18
19 import os
20 import pprint
21 import re
22 import socket
23 import sys
24 import time
25 import traceback
26 import types
27
28 from unittest import *
29 from unittest import _TextTestResult
30
31 from cherrypy._cpcompat import basestring, ntob, py3k, HTTPConnection, HTTPSConnection, unicodestr
32
33
34
36 """Return an IP address for a client connection given the server host.
37
38 If the server is listening on '0.0.0.0' (INADDR_ANY)
39 or '::' (IN6ADDR_ANY), this will return the proper localhost."""
40 if host == '0.0.0.0':
41
42 return "127.0.0.1"
43 if host == '::':
44
45 return "::1"
46 return host
47
48
50
52
53 if self.errors or self.failures:
54 if self.dots or self.showAll:
55 self.stream.writeln()
56 self.printErrorList('ERROR', self.errors)
57 self.printErrorList('FAIL', self.failures)
58
59
61 """A test runner class that displays results in textual form."""
62
65
66 - def run(self, test):
67 "Run the given test case or test suite."
68
69 result = self._makeResult()
70 test(result)
71 result.printErrors()
72 if not result.wasSuccessful():
73 self.stream.write("FAILED (")
74 failed, errored = list(map(len, (result.failures, result.errors)))
75 if failed:
76 self.stream.write("failures=%d" % failed)
77 if errored:
78 if failed: self.stream.write(", ")
79 self.stream.write("errors=%d" % errored)
80 self.stream.writeln(")")
81 return result
82
83
85
87 """Return a suite of all tests cases given a string specifier.
88
89 The name may resolve either to a module, a test case class, a
90 test method within a test case class, or a callable object which
91 returns a TestCase or TestSuite instance.
92
93 The method optionally resolves the names relative to a given module.
94 """
95 parts = name.split('.')
96 unused_parts = []
97 if module is None:
98 if not parts:
99 raise ValueError("incomplete test name: %s" % name)
100 else:
101 parts_copy = parts[:]
102 while parts_copy:
103 target = ".".join(parts_copy)
104 if target in sys.modules:
105 module = reload(sys.modules[target])
106 parts = unused_parts
107 break
108 else:
109 try:
110 module = __import__(target)
111 parts = unused_parts
112 break
113 except ImportError:
114 unused_parts.insert(0,parts_copy[-1])
115 del parts_copy[-1]
116 if not parts_copy:
117 raise
118 parts = parts[1:]
119 obj = module
120 for part in parts:
121 obj = getattr(obj, part)
122
123 if type(obj) == types.ModuleType:
124 return self.loadTestsFromModule(obj)
125 elif (((py3k and isinstance(obj, type))
126 or isinstance(obj, (type, types.ClassType)))
127 and issubclass(obj, TestCase)):
128 return self.loadTestsFromTestCase(obj)
129 elif type(obj) == types.UnboundMethodType:
130 if py3k:
131 return obj.__self__.__class__(obj.__name__)
132 else:
133 return obj.im_class(obj.__name__)
134 elif hasattr(obj, '__call__'):
135 test = obj()
136 if not isinstance(test, TestCase) and \
137 not isinstance(test, TestSuite):
138 raise ValueError("calling %s returned %s, "
139 "not a test" % (obj,test))
140 return test
141 else:
142 raise ValueError("do not know how to make test from: %s" % obj)
143
144
145 try:
146
147 if sys.platform[:4] == 'java':
149
150 return sys.stdin.read(1)
151 else:
152
153 import msvcrt
155 return msvcrt.getch()
156 except ImportError:
157
158 import tty, termios
160 fd = sys.stdin.fileno()
161 old_settings = termios.tcgetattr(fd)
162 try:
163 tty.setraw(sys.stdin.fileno())
164 ch = sys.stdin.read(1)
165 finally:
166 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
167 return ch
168
169
171 HOST = "127.0.0.1"
172 PORT = 8000
173 HTTP_CONN = HTTPConnection
174 PROTOCOL = "HTTP/1.1"
175
176 scheme = "http"
177 url = None
178
179 status = None
180 headers = None
181 body = None
182
183 encoding = 'utf-8'
184
185 time = None
186
198
200 """Make our HTTP_CONN persistent (or not).
201
202 If the 'on' argument is True (the default), then self.HTTP_CONN
203 will be set to an instance of HTTPConnection (or HTTPS
204 if self.scheme is "https"). This will then persist across requests.
205
206 We only allow for a single open connection, so if you call this
207 and we currently have an open connection, it will be closed.
208 """
209 try:
210 self.HTTP_CONN.close()
211 except (TypeError, AttributeError):
212 pass
213
214 if on:
215 self.HTTP_CONN = self.get_conn(auto_open=auto_open)
216 else:
217 if self.scheme == "https":
218 self.HTTP_CONN = HTTPSConnection
219 else:
220 self.HTTP_CONN = HTTPConnection
221
223 return hasattr(self.HTTP_CONN, "__class__")
226 persistent = property(_get_persistent, _set_persistent)
227
229 """Return an IP address for a client connection.
230
231 If the server is listening on '0.0.0.0' (INADDR_ANY)
232 or '::' (IN6ADDR_ANY), this will return the proper localhost."""
233 return interface(self.HOST)
234
235 - def getPage(self, url, headers=None, method="GET", body=None, protocol=None):
236 """Open the url with debugging support. Return status, headers, body."""
237 ServerError.on = False
238
239 if isinstance(url, unicodestr):
240 url = url.encode('utf-8')
241 if isinstance(body, unicodestr):
242 body = body.encode('utf-8')
243
244 self.url = url
245 self.time = None
246 start = time.time()
247 result = openURL(url, headers, method, body, self.HOST, self.PORT,
248 self.HTTP_CONN, protocol or self.PROTOCOL)
249 self.time = time.time() - start
250 self.status, self.headers, self.body = result
251
252
253 self.cookies = [('Cookie', v) for k, v in self.headers
254 if k.lower() == 'set-cookie']
255
256 if ServerError.on:
257 raise ServerError()
258 return result
259
260 interactive = True
261 console_height = 30
262
264 print("")
265 print(" ERROR: %s" % msg)
266
267 if not self.interactive:
268 raise self.failureException(msg)
269
270 p = " Show: [B]ody [H]eaders [S]tatus [U]RL; [I]gnore, [R]aise, or sys.e[X]it >> "
271 sys.stdout.write(p)
272 sys.stdout.flush()
273 while True:
274 i = getchar().upper()
275 if not isinstance(i, type("")):
276 i = i.decode('ascii')
277 if i not in "BHSUIRX":
278 continue
279 print(i.upper())
280 if i == "B":
281 for x, line in enumerate(self.body.splitlines()):
282 if (x + 1) % self.console_height == 0:
283
284 sys.stdout.write("<-- More -->\r")
285 m = getchar().lower()
286
287 sys.stdout.write(" \r")
288 if m == "q":
289 break
290 print(line)
291 elif i == "H":
292 pprint.pprint(self.headers)
293 elif i == "S":
294 print(self.status)
295 elif i == "U":
296 print(self.url)
297 elif i == "I":
298
299 return
300 elif i == "R":
301 raise self.failureException(msg)
302 elif i == "X":
303 self.exit()
304 sys.stdout.write(p)
305 sys.stdout.flush()
306
309
338
340 """Fail if (key, [value]) not in self.headers."""
341 lowkey = key.lower()
342 for k, v in self.headers:
343 if k.lower() == lowkey:
344 if value is None or str(value) == v:
345 return v
346
347 if msg is None:
348 if value is None:
349 msg = '%r not in headers' % key
350 else:
351 msg = '%r:%r not in headers' % (key, value)
352 self._handlewebError(msg)
353
355 """Fail if the header does not contain the specified value"""
356 actual_value = self.assertHeader(key, msg=msg)
357 header_values = map(str.strip, actual_value.split(','))
358 if value in header_values:
359 return value
360
361 if msg is None:
362 msg = "%r not in %r" % (value, header_values)
363 self._handlewebError(msg)
364
366 """Fail if key in self.headers."""
367 lowkey = key.lower()
368 matches = [k for k, v in self.headers if k.lower() == lowkey]
369 if matches:
370 if msg is None:
371 msg = '%r in headers' % key
372 self._handlewebError(msg)
373
374 - def assertBody(self, value, msg=None):
375 """Fail if value != self.body."""
376 if isinstance(value, unicodestr):
377 value = value.encode(self.encoding)
378 if value != self.body:
379 if msg is None:
380 msg = 'expected body:\n%r\n\nactual body:\n%r' % (value, self.body)
381 self._handlewebError(msg)
382
383 - def assertInBody(self, value, msg=None):
384 """Fail if value not in self.body."""
385 if isinstance(value, unicodestr):
386 value = value.encode(self.encoding)
387 if value not in self.body:
388 if msg is None:
389 msg = '%r not in body: %s' % (value, self.body)
390 self._handlewebError(msg)
391
392 - def assertNotInBody(self, value, msg=None):
393 """Fail if value in self.body."""
394 if isinstance(value, unicodestr):
395 value = value.encode(self.encoding)
396 if value in self.body:
397 if msg is None:
398 msg = '%r found in body' % value
399 self._handlewebError(msg)
400
401 - def assertMatchesBody(self, pattern, msg=None, flags=0):
402 """Fail if value (a regex pattern) is not in self.body."""
403 if isinstance(pattern, unicodestr):
404 pattern = pattern.encode(self.encoding)
405 if re.search(pattern, self.body, flags) is None:
406 if msg is None:
407 msg = 'No match for %r in body' % pattern
408 self._handlewebError(msg)
409
410
411 methods_with_bodies = ("POST", "PUT")
412
414 """Return request headers, with required headers added (if missing)."""
415 if headers is None:
416 headers = []
417
418
419
420 found = False
421 for k, v in headers:
422 if k.lower() == 'host':
423 found = True
424 break
425 if not found:
426 if port == 80:
427 headers.append(("Host", host))
428 else:
429 headers.append(("Host", "%s:%s" % (host, port)))
430
431 if method in methods_with_bodies:
432
433 found = False
434 for k, v in headers:
435 if k.lower() == 'content-type':
436 found = True
437 break
438 if not found:
439 headers.append(("Content-Type", "application/x-www-form-urlencoded"))
440 headers.append(("Content-Length", str(len(body or ""))))
441
442 return headers
443
444
446 """Return status, headers, body the way we like from a response."""
447 if py3k:
448 h = response.getheaders()
449 else:
450 h = []
451 key, value = None, None
452 for line in response.msg.headers:
453 if line:
454 if line[0] in " \t":
455 value += line.strip()
456 else:
457 if key and value:
458 h.append((key, value))
459 key, value = line.split(":", 1)
460 key = key.strip()
461 value = value.strip()
462 if key and value:
463 h.append((key, value))
464
465 return "%s %s" % (response.status, response.reason), h, response.read()
466
467
468 -def openURL(url, headers=None, method="GET", body=None,
469 host="127.0.0.1", port=8000, http_conn=HTTPConnection,
470 protocol="HTTP/1.1"):
471 """Open the given HTTP resource and return status, headers, and body."""
472
473 headers = cleanHeaders(headers, method, body, host, port)
474
475
476
477 for trial in range(10):
478 try:
479
480 if hasattr(http_conn, "host"):
481 conn = http_conn
482 else:
483 conn = http_conn(interface(host), port)
484
485 conn._http_vsn_str = protocol
486 conn._http_vsn = int("".join([x for x in protocol if x.isdigit()]))
487
488
489 if sys.version_info < (2, 4):
490 def putheader(self, header, value):
491 if header == 'Accept-Encoding' and value == 'identity':
492 return
493 self.__class__.putheader(self, header, value)
494 import new
495 conn.putheader = new.instancemethod(putheader, conn, conn.__class__)
496 conn.putrequest(method.upper(), url, skip_host=True)
497 elif not py3k:
498 conn.putrequest(method.upper(), url, skip_host=True,
499 skip_accept_encoding=True)
500 else:
501 import http.client
502
503 def putrequest(self, method, url):
504 if self._HTTPConnection__response and self._HTTPConnection__response.isclosed():
505 self._HTTPConnection__response = None
506
507 if self._HTTPConnection__state == http.client._CS_IDLE:
508 self._HTTPConnection__state = http.client._CS_REQ_STARTED
509 else:
510 raise http.client.CannotSendRequest()
511
512 self._method = method
513 if not url:
514 url = ntob('/')
515 request = ntob(' ').join((method.encode("ASCII"), url,
516 self._http_vsn_str.encode("ASCII")))
517 self._output(request)
518 import types
519 conn.putrequest = types.MethodType(putrequest, conn)
520
521 conn.putrequest(method.upper(), url)
522
523 for key, value in headers:
524 conn.putheader(key, ntob(value, "Latin-1"))
525 conn.endheaders()
526
527 if body is not None:
528 conn.send(body)
529
530
531 response = conn.getresponse()
532
533 s, h, b = shb(response)
534
535 if not hasattr(http_conn, "host"):
536
537 conn.close()
538
539 return s, h, b
540 except socket.error:
541 time.sleep(0.5)
542 if trial == 9:
543 raise
544
545
546
547
548 ignored_exceptions = []
549
550
551
552
553 ignore_all = False
554
557
558
560 """Server debug hook. Return True if exception handled, False if ignored.
561
562 You probably want to wrap this, so you can still handle an error using
563 your framework when it's ignored.
564 """
565 if exc is None:
566 exc = sys.exc_info()
567
568 if ignore_all or exc[0] in ignored_exceptions:
569 return False
570 else:
571 ServerError.on = True
572 print("")
573 print("".join(traceback.format_exception(*exc)))
574 return True
575