source file: /home/buildslave/tahoe/edgy/build/src/allmydata/util/testutil.py
file stats: 100 lines, 70 executed: 70.0% covered
   1. import os, signal, time
   2. from random import randrange
   3. 
   4. from twisted.internet import reactor, defer, task
   5. from twisted.python import failure
   6. 
   7. 
   8. def flip_bit(good, which):
   9.     # flip the low-order bit of good[which]
  10.     if which == -1:
  11.         pieces = good[:which], good[-1:], ""
  12.     else:
  13.         pieces = good[:which], good[which:which+1], good[which+1:]
  14.     return pieces[0] + chr(ord(pieces[1]) ^ 0x01) + pieces[2]
  15. 
  16. def flip_one_bit(s):
  17.     # flip one random bit of the string s
  18.     i = randrange(0, len(s))
  19.     result = s[:i] + chr(ord(s[i])^(0x01<<randrange(0, 8))) + s[i+1:]
  20.     assert result != s, "Internal error -- flip_one_bit() produced the same string as its input: %s == %s" % (result, s)
  21.     return result
  22. 
  23. class SignalMixin:
  24.     # This class is necessary for any code which wants to use Processes
  25.     # outside the usual reactor.run() environment. It is copied from
  26.     # Twisted's twisted.test.test_process
  27.     sigchldHandler = None
  28. 
  29.     def setUpClass(self):
  30.         # make sure SIGCHLD handler is installed, as it should be on
  31.         # reactor.run(). problem is reactor may not have been run when this
  32.         # test runs.
  33.         if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
  34.             self.sigchldHandler = signal.signal(signal.SIGCHLD,
  35.                                                 reactor._handleSigchld)
  36. 
  37.     def tearDownClass(self):
  38.         if self.sigchldHandler:
  39.             signal.signal(signal.SIGCHLD, self.sigchldHandler)
  40. 
  41. class TimeoutError(Exception):
  42.     pass
  43. 
  44. class PollMixin:
  45. 
  46.     def poll(self, check_f, pollinterval=0.01, timeout=None):
  47.         # Return a Deferred, then call check_f periodically until it returns
  48.         # True, at which point the Deferred will fire.. If check_f raises an
  49.         # exception, the Deferred will errback. If the check_f does not
  50.         # indicate success within timeout= seconds, the Deferred will
  51.         # errback. If timeout=None, no timeout will be enforced.
  52.         cutoff = None
  53.         if timeout is not None:
  54.             cutoff = time.time() + timeout
  55.         stash = [] # ick. We have to pass the LoopingCall into itself
  56.         lc = task.LoopingCall(self._poll, check_f, stash, cutoff)
  57.         stash.append(lc)
  58.         d = lc.start(pollinterval)
  59.         return d
  60. 
  61.     def _poll(self, check_f, stash, cutoff):
  62.         if cutoff is not None and time.time() > cutoff:
  63.             raise TimeoutError()
  64.         lc = stash[0]
  65.         if check_f():
  66.             lc.stop()
  67. 
  68. class StallMixin:
  69.     def stall(self, res=None, delay=1):
  70.         d = defer.Deferred()
  71.         reactor.callLater(delay, d.callback, res)
  72.         return d
  73. 
  74. class ShouldFailMixin:
  75. 
  76.     def shouldFail(self, expected_failure, which, substring,
  77.                    callable, *args, **kwargs):
  78.         assert substring is None or isinstance(substring, str)
  79.         d = defer.maybeDeferred(callable, *args, **kwargs)
  80.         def done(res):
  81.             if isinstance(res, failure.Failure):
  82.                 res.trap(expected_failure)
  83.                 if substring:
  84.                     self.failUnless(substring in str(res),
  85.                                     "%s: substring '%s' not in '%s'"
  86.                                     % (which, substring, str(res)))
  87.                 # return the Failure for further analysis, but in a form that
  88.                 # doesn't make the Deferred chain think that we failed.
  89.                 return [res]
  90.             else:
  91.                 self.fail("%s was supposed to raise %s, not get '%s'" %
  92.                           (which, expected_failure, res))
  93.         d.addBoth(done)
  94.         return d
  95. 
  96. 
  97. class TestMixin(SignalMixin):
  98.     def setUp(self, repeatable=False):
  99.         """
 100.         @param repeatable: install the repeatable_randomness hacks to attempt
 101.             to without access to real randomness and real time.time from the
 102.             code under test
 103.         """
 104.         self.repeatable = repeatable
 105.         if self.repeatable:
 106.             import repeatable_random
 107.             repeatable_random.force_repeatability()
 108.         if hasattr(time, 'realtime'):
 109.             self.teststarttime = time.realtime()
 110.         else:
 111.             self.teststarttime = time.time()
 112. 
 113.     def tearDown(self):
 114.         if self.repeatable:
 115.             import repeatable_random
 116.             repeatable_random.restore_non_repeatability()
 117.         self.clean_pending(required_to_quiesce=True)
 118. 
 119.     def clean_pending(self, dummy=None, required_to_quiesce=True):
 120.         """
 121.         This handy method cleans all pending tasks from the reactor.
 122. 
 123.         When writing a unit test, consider the following question:
 124. 
 125.             Is the code that you are testing required to release control once it
 126.             has done its job, so that it is impossible for it to later come around
 127.             (with a delayed reactor task) and do anything further?
 128. 
 129.         If so, then trial will usefully test that for you -- if the code under
 130.         test leaves any pending tasks on the reactor then trial will fail it.
 131. 
 132.         On the other hand, some code is *not* required to release control -- some
 133.         code is allowed to continuously maintain control by rescheduling reactor
 134.         tasks in order to do ongoing work.  Trial will incorrectly require that
 135.         code to clean up all its tasks from the reactor.
 136. 
 137.         Most people think that such code should be amended to have an optional
 138.         "shutdown" operation that releases all control, but on the contrary it is
 139.         good design for some code to *not* have a shutdown operation, but instead
 140.         to have a "crash-only" design in which it recovers from crash on startup.
 141. 
 142.         If the code under test is of the "long-running" kind, which is *not*
 143.         required to shutdown cleanly in order to pass tests, then you can simply
 144.         call testutil.clean_pending() at the end of the unit test, and trial will
 145.         be satisfied.
 146.         """
 147.         pending = reactor.getDelayedCalls()
 148.         active = bool(pending)
 149.         for p in pending:
 150.             if p.active():
 151.                 p.cancel()
 152.             else:
 153.                 print "WEIRNESS! pending timed call not active+!"
 154.         if required_to_quiesce and active:
 155.             self.fail("Reactor was still active when it was required to be quiescent.")
 156. 
 157. try:
 158.     import win32file
 159.     import win32con
 160.     def make_readonly(path):
 161.         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_READONLY)
 162.     def make_accessible(path):
 163.         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_NORMAL)
 164. except ImportError:
 165.     import stat
 166.     def _make_readonly(path):
 167.         os.chmod(path, stat.S_IREAD)
 168.         os.chmod(os.path.dirname(path), stat.S_IREAD)
 169.     def _make_accessible(path):
 170.         os.chmod(os.path.dirname(path), stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
 171.         os.chmod(path, stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
 172.     make_readonly = _make_readonly
 173.     make_accessible = _make_accessible