source file: /home/buildslave/tahoe/edgy/build/src/allmydata/mutable/node.py
file stats: 320 lines, 319 executed: 99.7% covered
   1. 
   2. import weakref, random
   3. from twisted.application import service
   4. 
   5. from zope.interface import implements
   6. from twisted.internet import defer, reactor
   7. from twisted.python import log
   8. from foolscap.eventual import eventually
   9. from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
  10.      ICheckable, ICheckerResults
  11. from allmydata.util import hashutil
  12. from allmydata.util.assertutil import precondition
  13. from allmydata.uri import WriteableSSKFileURI
  14. from allmydata.immutable.encode import NotEnoughSharesError
  15. from allmydata.immutable.checker import DeepCheckResults
  16. from pycryptopp.publickey import rsa
  17. from pycryptopp.cipher.aes import AES
  18. 
  19. from publish import Publish
  20. from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
  21.      ResponseCache, UncoordinatedWriteError
  22. from servermap import ServerMap, ServermapUpdater
  23. from retrieve import Retrieve
  24. from checker import MutableChecker
  25. from repair import Repairer
  26. 
  27. 
  28. class BackoffAgent:
  29.     # these parameters are copied from foolscap.reconnector, which gets them
  30.     # from twisted.internet.protocol.ReconnectingClientFactory
  31.     initialDelay = 1.0
  32.     factor = 2.7182818284590451 # (math.e)
  33.     jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
  34.     maxRetries = 4
  35. 
  36.     def __init__(self):
  37.         self._delay = self.initialDelay
  38.         self._count = 0
  39.     def delay(self, node, f):
  40.         self._count += 1
  41.         if self._count == 4:
  42.             return f
  43.         self._delay = self._delay * self.factor
  44.         self._delay = random.normalvariate(self._delay,
  45.                                            self._delay * self.jitter)
  46.         d = defer.Deferred()
  47.         reactor.callLater(self._delay, d.callback, None)
  48.         return d
  49. 
  50. # use client.create_mutable_file() to make one of these
  51. 
  52. class MutableFileNode:
  53.     implements(IMutableFileNode, ICheckable)
  54.     SIGNATURE_KEY_SIZE = 2048
  55.     DEFAULT_ENCODING = (3, 10)
  56.     checker_class = MutableChecker
  57. 
  58.     def __init__(self, client):
  59.         self._client = client
  60.         self._pubkey = None # filled in upon first read
  61.         self._privkey = None # filled in if we're mutable
  62.         # we keep track of the last encoding parameters that we use. These
  63.         # are updated upon retrieve, and used by publish. If we publish
  64.         # without ever reading (i.e. overwrite()), then we use these values.
  65.         (self._required_shares, self._total_shares) = self.DEFAULT_ENCODING
  66.         self._sharemap = {} # known shares, shnum-to-[nodeids]
  67.         self._cache = ResponseCache()
  68. 
  69.         # all users of this MutableFileNode go through the serializer. This
  70.         # takes advantage of the fact that Deferreds discard the callbacks
  71.         # that they're done with, so we can keep using the same Deferred
  72.         # forever without consuming more and more memory.
  73.         self._serializer = defer.succeed(None)
  74. 
  75.     def __repr__(self):
  76.         if hasattr(self, '_uri'):
  77.             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
  78.         else:
  79.             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
  80. 
  81.     def init_from_uri(self, myuri):
  82.         # we have the URI, but we have not yet retrieved the public
  83.         # verification key, nor things like 'k' or 'N'. If and when someone
  84.         # wants to get our contents, we'll pull from shares and fill those
  85.         # in.
  86.         self._uri = IMutableFileURI(myuri)
  87.         if not self._uri.is_readonly():
  88.             self._writekey = self._uri.writekey
  89.         self._readkey = self._uri.readkey
  90.         self._storage_index = self._uri.storage_index
  91.         self._fingerprint = self._uri.fingerprint
  92.         # the following values are learned during Retrieval
  93.         #  self._pubkey
  94.         #  self._required_shares
  95.         #  self._total_shares
  96.         # and these are needed for Publish. They are filled in by Retrieval
  97.         # if possible, otherwise by the first peer that Publish talks to.
  98.         self._privkey = None
  99.         self._encprivkey = None
 100.         return self
 101. 
 102.     def create(self, initial_contents, keypair_generator=None):
 103.         """Call this when the filenode is first created. This will generate
 104.         the keys, generate the initial shares, wait until at least numpeers
 105.         are connected, allocate shares, and upload the initial
 106.         contents. Returns a Deferred that fires (with the MutableFileNode
 107.         instance you should use) when it completes.
 108.         """
 109. 
 110.         d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator)
 111.         d.addCallback(self._generated)
 112.         d.addCallback(lambda res: self._upload(initial_contents, None))
 113.         return d
 114. 
 115.     def _generated(self, (pubkey, privkey) ):
 116.         self._pubkey, self._privkey = pubkey, privkey
 117.         pubkey_s = self._pubkey.serialize()
 118.         privkey_s = self._privkey.serialize()
 119.         self._writekey = hashutil.ssk_writekey_hash(privkey_s)
 120.         self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
 121.         self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
 122.         self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
 123.         self._readkey = self._uri.readkey
 124.         self._storage_index = self._uri.storage_index
 125.         self._required_shares, self._total_shares = self.DEFAULT_ENCODING
 126. 
 127.     def _generate_pubprivkeys(self, keypair_generator):
 128.         if keypair_generator:
 129.             return keypair_generator(self.SIGNATURE_KEY_SIZE)
 130.         else:
 131.             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
 132.             signer = rsa.generate(self.SIGNATURE_KEY_SIZE)
 133.             verifier = signer.get_verifying_key()
 134.             return verifier, signer
 135. 
 136.     def _encrypt_privkey(self, writekey, privkey):
 137.         enc = AES(writekey)
 138.         crypttext = enc.process(privkey)
 139.         return crypttext
 140. 
 141.     def _decrypt_privkey(self, enc_privkey):
 142.         enc = AES(self._writekey)
 143.         privkey = enc.process(enc_privkey)
 144.         return privkey
 145. 
 146.     def _populate_pubkey(self, pubkey):
 147.         self._pubkey = pubkey
 148.     def _populate_required_shares(self, required_shares):
 149.         self._required_shares = required_shares
 150.     def _populate_total_shares(self, total_shares):
 151.         self._total_shares = total_shares
 152. 
 153.     def _populate_privkey(self, privkey):
 154.         self._privkey = privkey
 155.     def _populate_encprivkey(self, encprivkey):
 156.         self._encprivkey = encprivkey
 157. 
 158. 
 159.     def get_write_enabler(self, peerid):
 160.         assert len(peerid) == 20
 161.         return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
 162.     def get_renewal_secret(self, peerid):
 163.         assert len(peerid) == 20
 164.         crs = self._client.get_renewal_secret()
 165.         frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
 166.         return hashutil.bucket_renewal_secret_hash(frs, peerid)
 167.     def get_cancel_secret(self, peerid):
 168.         assert len(peerid) == 20
 169.         ccs = self._client.get_cancel_secret()
 170.         fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
 171.         return hashutil.bucket_cancel_secret_hash(fcs, peerid)
 172. 
 173.     def get_writekey(self):
 174.         return self._writekey
 175.     def get_readkey(self):
 176.         return self._readkey
 177.     def get_storage_index(self):
 178.         return self._storage_index
 179.     def get_privkey(self):
 180.         return self._privkey
 181.     def get_encprivkey(self):
 182.         return self._encprivkey
 183.     def get_pubkey(self):
 184.         return self._pubkey
 185. 
 186.     def get_required_shares(self):
 187.         return self._required_shares
 188.     def get_total_shares(self):
 189.         return self._total_shares
 190. 
 191.     ####################################
 192.     # IFilesystemNode
 193. 
 194.     def get_uri(self):
 195.         return self._uri.to_string()
 196.     def get_size(self):
 197.         return "?" # TODO: this is likely to cause problems, not being an int
 198.     def get_readonly(self):
 199.         if self.is_readonly():
 200.             return self
 201.         ro = MutableFileNode(self._client)
 202.         ro.init_from_uri(self._uri.get_readonly())
 203.         return ro
 204. 
 205.     def get_readonly_uri(self):
 206.         return self._uri.get_readonly().to_string()
 207. 
 208.     def is_mutable(self):
 209.         return self._uri.is_mutable()
 210.     def is_readonly(self):
 211.         return self._uri.is_readonly()
 212. 
 213.     def __hash__(self):
 214.         return hash((self.__class__, self._uri))
 215.     def __cmp__(self, them):
 216.         if cmp(type(self), type(them)):
 217.             return cmp(type(self), type(them))
 218.         if cmp(self.__class__, them.__class__):
 219.             return cmp(self.__class__, them.__class__)
 220.         return cmp(self._uri, them._uri)
 221. 
 222.     def get_verifier(self):
 223.         return IMutableFileURI(self._uri).get_verifier()
 224. 
 225.     def _do_serialized(self, cb, *args, **kwargs):
 226.         # note: to avoid deadlock, this callable is *not* allowed to invoke
 227.         # other serialized methods within this (or any other)
 228.         # MutableFileNode. The callable should be a bound method of this same
 229.         # MFN instance.
 230.         d = defer.Deferred()
 231.         self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
 232.         # we need to put off d.callback until this Deferred is finished being
 233.         # processed. Otherwise the caller's subsequent activities (like,
 234.         # doing other things with this node) can cause reentrancy problems in
 235.         # the Deferred code itself
 236.         self._serializer.addBoth(lambda res: eventually(d.callback, res))
 237.         # add a log.err just in case something really weird happens, because
 238.         # self._serializer stays around forever, therefore we won't see the
 239.         # usual Unhandled Error in Deferred that would give us a hint.
 240.         self._serializer.addErrback(log.err)
 241.         return d
 242. 
 243.     #################################
 244.     # ICheckable
 245. 
 246.     def check(self, verify=False, repair=False):
 247.         checker = self.checker_class(self)
 248.         return checker.check(verify, repair)
 249. 
 250.     def deep_check(self, verify=False, repair=False):
 251.         d = self.check(verify, repair)
 252.         def _done(r):
 253.             dr = DeepCheckResults(self.get_storage_index())
 254.             dr.add_check(r)
 255.             return dr
 256.         d.addCallback(_done)
 257.         return d
 258. 
 259.     #################################
 260.     # IRepairable
 261. 
 262.     def repair(self, checker_results, force=False):
 263.         assert ICheckerResults(checker_results)
 264.         r = Repairer(self, checker_results)
 265.         d = r.start(force)
 266.         return d
 267. 
 268. 
 269.     #################################
 270.     # IMutableFileNode
 271. 
 272.     # allow the use of IDownloadTarget
 273.     def download(self, target):
 274.         # fake it. TODO: make this cleaner.
 275.         d = self.download_best_version()
 276.         def _done(data):
 277.             target.open(len(data))
 278.             target.write(data)
 279.             target.close()
 280.             return target.finish()
 281.         d.addCallback(_done)
 282.         return d
 283. 
 284. 
 285.     # new API
 286. 
 287.     def download_best_version(self):
 288.         return self._do_serialized(self._download_best_version)
 289.     def _download_best_version(self):
 290.         servermap = ServerMap()
 291.         d = self._try_once_to_download_best_version(servermap, MODE_READ)
 292.         def _maybe_retry(f):
 293.             f.trap(NotEnoughSharesError)
 294.             # the download is worth retrying once. Make sure to use the
 295.             # old servermap, since it is what remembers the bad shares,
 296.             # but use MODE_WRITE to make it look for even more shares.
 297.             # TODO: consider allowing this to retry multiple times.. this
 298.             # approach will let us tolerate about 8 bad shares, I think.
 299.             return self._try_once_to_download_best_version(servermap,
 300.                                                            MODE_WRITE)
 301.         d.addErrback(_maybe_retry)
 302.         return d
 303.     def _try_once_to_download_best_version(self, servermap, mode):
 304.         d = self._update_servermap(servermap, mode)
 305.         d.addCallback(self._once_updated_download_best_version, servermap)
 306.         return d
 307.     def _once_updated_download_best_version(self, ignored, servermap):
 308.         goal = servermap.best_recoverable_version()
 309.         if not goal:
 310.             raise UnrecoverableFileError("no recoverable versions")
 311.         return self._try_once_to_download_version(servermap, goal)
 312. 
 313.     def get_size_of_best_version(self):
 314.         d = self.get_servermap(MODE_READ)
 315.         def _got_servermap(smap):
 316.             ver = smap.best_recoverable_version()
 317.             if not ver:
 318.                 raise UnrecoverableFileError("no recoverable version")
 319.             return smap.size_of_version(ver)
 320.         d.addCallback(_got_servermap)
 321.         return d
 322. 
 323.     def overwrite(self, new_contents):
 324.         return self._do_serialized(self._overwrite, new_contents)
 325.     def _overwrite(self, new_contents):
 326.         servermap = ServerMap()
 327.         d = self._update_servermap(servermap, mode=MODE_WRITE)
 328.         d.addCallback(lambda ignored: self._upload(new_contents, servermap))
 329.         return d
 330. 
 331. 
 332.     def modify(self, modifier, backoffer=None):
 333.         """I use a modifier callback to apply a change to the mutable file.
 334.         I implement the following pseudocode::
 335. 
 336.          obtain_mutable_filenode_lock()
 337.          while True:
 338.            update_servermap(MODE_WRITE)
 339.            old = retrieve_best_version()
 340.            new = modifier(old, *args, **kwargs)
 341.            if new == old: break
 342.            try:
 343.              publish(new)
 344.            except UncoordinatedWriteError, e:
 345.              backoffer(e)
 346.              continue
 347.            break
 348.          release_mutable_filenode_lock()
 349. 
 350.         The idea is that your modifier function can apply a delta of some
 351.         sort, and it will be re-run as necessary until it succeeds. The
 352.         modifier must inspect the old version to see whether its delta has
 353.         already been applied: if so it should return the contents unmodified.
 354. 
 355.         Note that the modifier is required to run synchronously, and must not
 356.         invoke any methods on this MutableFileNode instance.
 357. 
 358.         The backoff-er is a callable that is responsible for inserting a
 359.         random delay between subsequent attempts, to help competing updates
 360.         from colliding forever. It is also allowed to give up after a while.
 361.         The backoffer is given two arguments: this MutableFileNode, and the
 362.         Failure object that contains the UncoordinatedWriteError. It should
 363.         return a Deferred that will fire when the next attempt should be
 364.         made, or return the Failure if the loop should give up. If
 365.         backoffer=None, a default one is provided which will perform
 366.         exponential backoff, and give up after 4 tries. Note that the
 367.         backoffer should not invoke any methods on this MutableFileNode
 368.         instance, and it needs to be highly conscious of deadlock issues.
 369.         """
 370.         return self._do_serialized(self._modify, modifier, backoffer)
 371.     def _modify(self, modifier, backoffer):
 372.         servermap = ServerMap()
 373.         if backoffer is None:
 374.             backoffer = BackoffAgent().delay
 375.         return self._modify_and_retry(servermap, modifier, backoffer)
 376.     def _modify_and_retry(self, servermap, modifier, backoffer):
 377.         d = self._modify_once(servermap, modifier)
 378.         def _retry(f):
 379.             f.trap(UncoordinatedWriteError)
 380.             d2 = defer.maybeDeferred(backoffer, self, f)
 381.             d2.addCallback(lambda ignored:
 382.                            self._modify_and_retry(servermap, modifier,
 383.                                                   backoffer))
 384.             return d2
 385.         d.addErrback(_retry)
 386.         return d
 387.     def _modify_once(self, servermap, modifier):
 388.         d = self._update_servermap(servermap, MODE_WRITE)
 389.         d.addCallback(self._once_updated_download_best_version, servermap)
 390.         def _apply(old_contents):
 391.             new_contents = modifier(old_contents)
 392.             if new_contents is None or new_contents == old_contents:
 393.                 # no changes need to be made
 394.                 return
 395.             precondition(isinstance(new_contents, str),
 396.                          "Modifier function must return a string or None")
 397.             return self._upload(new_contents, servermap)
 398.         d.addCallback(_apply)
 399.         return d
 400. 
 401.     def get_servermap(self, mode):
 402.         return self._do_serialized(self._get_servermap, mode)
 403.     def _get_servermap(self, mode):
 404.         servermap = ServerMap()
 405.         return self._update_servermap(servermap, mode)
 406.     def _update_servermap(self, servermap, mode):
 407.         u = ServermapUpdater(self, servermap, mode)
 408.         self._client.notify_mapupdate(u.get_status())
 409.         return u.update()
 410. 
 411.     def download_version(self, servermap, version, fetch_privkey=False):
 412.         return self._do_serialized(self._try_once_to_download_version,
 413.                                    servermap, version, fetch_privkey)
 414.     def _try_once_to_download_version(self, servermap, version,
 415.                                       fetch_privkey=False):
 416.         r = Retrieve(self, servermap, version, fetch_privkey)
 417.         self._client.notify_retrieve(r.get_status())
 418.         return r.download()
 419. 
 420.     def upload(self, new_contents, servermap):
 421.         return self._do_serialized(self._upload, new_contents, servermap)
 422.     def _upload(self, new_contents, servermap):
 423.         assert self._pubkey, "update_servermap must be called before publish"
 424.         p = Publish(self, servermap)
 425.         self._client.notify_publish(p.get_status(), len(new_contents))
 426.         return p.publish(new_contents)
 427. 
 428. 
 429. 
 430. 
 431. class MutableWatcher(service.MultiService):
 432.     MAX_MAPUPDATE_STATUSES = 20
 433.     MAX_PUBLISH_STATUSES = 20
 434.     MAX_RETRIEVE_STATUSES = 20
 435.     name = "mutable-watcher"
 436. 
 437.     def __init__(self, stats_provider=None):
 438.         service.MultiService.__init__(self)
 439.         self.stats_provider = stats_provider
 440.         self._all_mapupdate_status = weakref.WeakKeyDictionary()
 441.         self._recent_mapupdate_status = []
 442.         self._all_publish_status = weakref.WeakKeyDictionary()
 443.         self._recent_publish_status = []
 444.         self._all_retrieve_status = weakref.WeakKeyDictionary()
 445.         self._recent_retrieve_status = []
 446. 
 447. 
 448.     def notify_mapupdate(self, p):
 449.         self._all_mapupdate_status[p] = None
 450.         self._recent_mapupdate_status.append(p)
 451.         while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
 452.             self._recent_mapupdate_status.pop(0)
 453. 
 454.     def notify_publish(self, p, size):
 455.         self._all_publish_status[p] = None
 456.         self._recent_publish_status.append(p)
 457.         if self.stats_provider:
 458.             self.stats_provider.count('mutable.files_published', 1)
 459.             # We must be told bytes_published as an argument, since the
 460.             # publish_status does not yet know how much data it will be asked
 461.             # to send. When we move to MDMF we'll need to find a better way
 462.             # to handle this.
 463.             self.stats_provider.count('mutable.bytes_published', size)
 464.         while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
 465.             self._recent_publish_status.pop(0)
 466. 
 467.     def notify_retrieve(self, r):
 468.         self._all_retrieve_status[r] = None
 469.         self._recent_retrieve_status.append(r)
 470.         if self.stats_provider:
 471.             self.stats_provider.count('mutable.files_retrieved', 1)
 472.             self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
 473.         while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
 474.             self._recent_retrieve_status.pop(0)
 475. 
 476. 
 477.     def list_all_mapupdate_statuses(self):
 478.         return self._all_mapupdate_status.keys()
 479.     def list_all_publish_statuses(self):
 480.         return self._all_publish_status.keys()
 481.     def list_all_retrieve_statuses(self):
 482.         return self._all_retrieve_status.keys()