source file: /home/buildslave/tahoe/edgy/build/src/allmydata/immutable/upload.py
file stats: 908 lines, 874 executed: 96.3% covered
   1. 
   2. import os, time, weakref, itertools
   3. from zope.interface import implements
   4. from twisted.python import failure
   5. from twisted.internet import defer
   6. from twisted.application import service
   7. from foolscap import Referenceable, Copyable, RemoteCopy
   8. from foolscap import eventual
   9. from foolscap.logging import log
  10. 
  11. from allmydata.util.hashutil import file_renewal_secret_hash, \
  12.      file_cancel_secret_hash, bucket_renewal_secret_hash, \
  13.      bucket_cancel_secret_hash, plaintext_hasher, \
  14.      storage_index_hash, plaintext_segment_hasher, convergence_hasher
  15. from allmydata import storage, hashtree, uri
  16. from allmydata.immutable import encode
  17. from allmydata.util import base32, idlib, mathutil
  18. from allmydata.util.assertutil import precondition
  19. from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
  20.      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
  21. from pycryptopp.cipher.aes import AES
  22. 
  23. from cStringIO import StringIO
  24. 
  25. 
  26. KiB=1024
  27. MiB=1024*KiB
  28. GiB=1024*MiB
  29. TiB=1024*GiB
  30. PiB=1024*TiB
  31. 
  32. class HaveAllPeersError(Exception):
  33.     # we use this to jump out of the loop
  34.     pass
  35. 
  36. # this wants to live in storage, not here
  37. class TooFullError(Exception):
  38.     pass
  39. 
  40. class UploadResults(Copyable, RemoteCopy):
  41.     implements(IUploadResults)
  42.     # note: don't change this string, it needs to match the value used on the
  43.     # helper, and it does *not* need to match the fully-qualified
  44.     # package/module/class name
  45.     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
  46.     copytype = typeToCopy
  47. 
  48.     def __init__(self):
  49.         self.timings = {} # dict of name to number of seconds
  50.         self.sharemap = {} # dict of shnum to placement string
  51.         self.servermap = {} # dict of peerid to set(shnums)
  52.         self.file_size = None
  53.         self.ciphertext_fetched = None # how much the helper fetched
  54.         self.uri = None
  55.         self.preexisting_shares = None # count of shares already present
  56.         self.pushed_shares = None # count of shares we pushed
  57. 
  58. 
  59. # our current uri_extension is 846 bytes for small files, a few bytes
  60. # more for larger ones (since the filesize is encoded in decimal in a
  61. # few places). Ask for a little bit more just in case we need it. If
  62. # the extension changes size, we can change EXTENSION_SIZE to
  63. # allocate a more accurate amount of space.
  64. EXTENSION_SIZE = 1000
  65. # TODO: actual extensions are closer to 419 bytes, so we can probably lower
  66. # this.
  67. 
  68. class PeerTracker:
  69.     def __init__(self, peerid, storage_server,
  70.                  sharesize, blocksize, num_segments, num_share_hashes,
  71.                  storage_index,
  72.                  bucket_renewal_secret, bucket_cancel_secret):
  73.         precondition(isinstance(peerid, str), peerid)
  74.         precondition(len(peerid) == 20, peerid)
  75.         self.peerid = peerid
  76.         self._storageserver = storage_server # to an RIStorageServer
  77.         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
  78.         self.sharesize = sharesize
  79.         as = storage.allocated_size(sharesize,
  80.                                     num_segments,
  81.                                     num_share_hashes,
  82.                                     EXTENSION_SIZE)
  83.         self.allocated_size = as
  84. 
  85.         self.blocksize = blocksize
  86.         self.num_segments = num_segments
  87.         self.num_share_hashes = num_share_hashes
  88.         self.storage_index = storage_index
  89. 
  90.         self.renew_secret = bucket_renewal_secret
  91.         self.cancel_secret = bucket_cancel_secret
  92. 
  93.     def __repr__(self):
  94.         return ("<PeerTracker for peer %s and SI %s>"
  95.                 % (idlib.shortnodeid_b2a(self.peerid),
  96.                    storage.si_b2a(self.storage_index)[:5]))
  97. 
  98.     def query(self, sharenums):
  99.         d = self._storageserver.callRemote("allocate_buckets",
 100.                                            self.storage_index,
 101.                                            self.renew_secret,
 102.                                            self.cancel_secret,
 103.                                            sharenums,
 104.                                            self.allocated_size,
 105.                                            canary=Referenceable())
 106.         d.addCallback(self._got_reply)
 107.         return d
 108. 
 109.     def _got_reply(self, (alreadygot, buckets)):
 110.         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
 111.         b = {}
 112.         for sharenum, rref in buckets.iteritems():
 113.             bp = storage.WriteBucketProxy(rref, self.sharesize,
 114.                                           self.blocksize,
 115.                                           self.num_segments,
 116.                                           self.num_share_hashes,
 117.                                           EXTENSION_SIZE,
 118.                                           self.peerid)
 119.             b[sharenum] = bp
 120.         self.buckets.update(b)
 121.         return (alreadygot, set(b.keys()))
 122. 
 123. class Tahoe2PeerSelector:
 124. 
 125.     def __init__(self, upload_id, logparent=None, upload_status=None):
 126.         self.upload_id = upload_id
 127.         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
 128.         self.error_count = 0
 129.         self.num_peers_contacted = 0
 130.         self.last_failure_msg = None
 131.         self._status = IUploadStatus(upload_status)
 132.         self._log_parent = log.msg("%s starting" % self, parent=logparent)
 133. 
 134.     def __repr__(self):
 135.         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
 136. 
 137.     def get_shareholders(self, client,
 138.                          storage_index, share_size, block_size,
 139.                          num_segments, total_shares, shares_of_happiness):
 140.         """
 141.         @return: (used_peers, already_peers), where used_peers is a set of
 142.                  PeerTracker instances that have agreed to hold some shares
 143.                  for us (the shnum is stashed inside the PeerTracker),
 144.                  and already_peers is a dict mapping shnum to a peer
 145.                  which claims to already have the share.
 146.         """
 147. 
 148.         if self._status:
 149.             self._status.set_status("Contacting Peers..")
 150. 
 151.         self.total_shares = total_shares
 152.         self.shares_of_happiness = shares_of_happiness
 153. 
 154.         self.homeless_shares = range(total_shares)
 155.         # self.uncontacted_peers = list() # peers we haven't asked yet
 156.         self.contacted_peers = [] # peers worth asking again
 157.         self.contacted_peers2 = [] # peers that we have asked again
 158.         self._started_second_pass = False
 159.         self.use_peers = set() # PeerTrackers that have shares assigned to them
 160.         self.preexisting_shares = {} # sharenum -> peerid holding the share
 161. 
 162.         peers = client.get_permuted_peers("storage", storage_index)
 163.         if not peers:
 164.             raise encode.NotEnoughSharesError("client gave us zero peers")
 165. 
 166.         # figure out how much space to ask for
 167. 
 168.         # this needed_hashes computation should mirror
 169.         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
 170.         # (instead of a HashTree) because we don't require actual hashing
 171.         # just to count the levels.
 172.         ht = hashtree.IncompleteHashTree(total_shares)
 173.         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
 174. 
 175.         # decide upon the renewal/cancel secrets, to include them in the
 176.         # allocat_buckets query.
 177.         client_renewal_secret = client.get_renewal_secret()
 178.         client_cancel_secret = client.get_cancel_secret()
 179. 
 180.         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
 181.                                                        storage_index)
 182.         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
 183.                                                      storage_index)
 184. 
 185.         trackers = [ PeerTracker(peerid, conn,
 186.                                  share_size, block_size,
 187.                                  num_segments, num_share_hashes,
 188.                                  storage_index,
 189.                                  bucket_renewal_secret_hash(file_renewal_secret,
 190.                                                             peerid),
 191.                                  bucket_cancel_secret_hash(file_cancel_secret,
 192.                                                            peerid),
 193.                                  )
 194.                      for (peerid, conn) in peers ]
 195.         self.uncontacted_peers = trackers
 196. 
 197.         d = defer.maybeDeferred(self._loop)
 198.         return d
 199. 
 200.     def _loop(self):
 201.         if not self.homeless_shares:
 202.             # all done
 203.             msg = ("placed all %d shares, "
 204.                    "sent %d queries to %d peers, "
 205.                    "%d queries placed some shares, %d placed none, "
 206.                    "got %d errors" %
 207.                    (self.total_shares,
 208.                     self.query_count, self.num_peers_contacted,
 209.                     self.good_query_count, self.bad_query_count,
 210.                     self.error_count))
 211.             log.msg("peer selection successful for %s: %s" % (self, msg),
 212.                     parent=self._log_parent)
 213.             return (self.use_peers, self.preexisting_shares)
 214. 
 215.         if self.uncontacted_peers:
 216.             peer = self.uncontacted_peers.pop(0)
 217.             # TODO: don't pre-convert all peerids to PeerTrackers
 218.             assert isinstance(peer, PeerTracker)
 219. 
 220.             shares_to_ask = set([self.homeless_shares.pop(0)])
 221.             self.query_count += 1
 222.             self.num_peers_contacted += 1
 223.             if self._status:
 224.                 self._status.set_status("Contacting Peers [%s] (first query),"
 225.                                         " %d shares left.."
 226.                                         % (idlib.shortnodeid_b2a(peer.peerid),
 227.                                            len(self.homeless_shares)))
 228.             d = peer.query(shares_to_ask)
 229.             d.addBoth(self._got_response, peer, shares_to_ask,
 230.                       self.contacted_peers)
 231.             return d
 232.         elif self.contacted_peers:
 233.             # ask a peer that we've already asked.
 234.             if not self._started_second_pass:
 235.                 log.msg("starting second pass", parent=self._log_parent,
 236.                         level=log.NOISY)
 237.                 self._started_second_pass = True
 238.             num_shares = mathutil.div_ceil(len(self.homeless_shares),
 239.                                            len(self.contacted_peers))
 240.             peer = self.contacted_peers.pop(0)
 241.             shares_to_ask = set(self.homeless_shares[:num_shares])
 242.             self.homeless_shares[:num_shares] = []
 243.             self.query_count += 1
 244.             if self._status:
 245.                 self._status.set_status("Contacting Peers [%s] (second query),"
 246.                                         " %d shares left.."
 247.                                         % (idlib.shortnodeid_b2a(peer.peerid),
 248.                                            len(self.homeless_shares)))
 249.             d = peer.query(shares_to_ask)
 250.             d.addBoth(self._got_response, peer, shares_to_ask,
 251.                       self.contacted_peers2)
 252.             return d
 253.         elif self.contacted_peers2:
 254.             # we've finished the second-or-later pass. Move all the remaining
 255.             # peers back into self.contacted_peers for the next pass.
 256.             self.contacted_peers.extend(self.contacted_peers2)
 257.             self.contacted_peers[:] = []
 258.             return self._loop()
 259.         else:
 260.             # no more peers. If we haven't placed enough shares, we fail.
 261.             placed_shares = self.total_shares - len(self.homeless_shares)
 262.             if placed_shares < self.shares_of_happiness:
 263.                 msg = ("placed %d shares out of %d total (%d homeless), "
 264.                        "sent %d queries to %d peers, "
 265.                        "%d queries placed some shares, %d placed none, "
 266.                        "got %d errors" %
 267.                        (self.total_shares - len(self.homeless_shares),
 268.                         self.total_shares, len(self.homeless_shares),
 269.                         self.query_count, self.num_peers_contacted,
 270.                         self.good_query_count, self.bad_query_count,
 271.                         self.error_count))
 272.                 msg = "peer selection failed for %s: %s" % (self, msg)
 273.                 if self.last_failure_msg:
 274.                     msg += " (%s)" % (self.last_failure_msg,)
 275.                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
 276.                 raise encode.NotEnoughSharesError(msg)
 277.             else:
 278.                 # we placed enough to be happy, so we're done
 279.                 if self._status:
 280.                     self._status.set_status("Placed all shares")
 281.                 return self.use_peers
 282. 
 283.     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
 284.         if isinstance(res, failure.Failure):
 285.             # This is unusual, and probably indicates a bug or a network
 286.             # problem.
 287.             log.msg("%s got error during peer selection: %s" % (peer, res),
 288.                     level=log.UNUSUAL, parent=self._log_parent)
 289.             self.error_count += 1
 290.             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
 291.             if (self.uncontacted_peers
 292.                 or self.contacted_peers
 293.                 or self.contacted_peers2):
 294.                 # there is still hope, so just loop
 295.                 pass
 296.             else:
 297.                 # No more peers, so this upload might fail (it depends upon
 298.                 # whether we've hit shares_of_happiness or not). Log the last
 299.                 # failure we got: if a coding error causes all peers to fail
 300.                 # in the same way, this allows the common failure to be seen
 301.                 # by the uploader and should help with debugging
 302.                 msg = ("last failure (from %s) was: %s" % (peer, res))
 303.                 self.last_failure_msg = msg
 304.         else:
 305.             (alreadygot, allocated) = res
 306.             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
 307.                     % (idlib.shortnodeid_b2a(peer.peerid),
 308.                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
 309.                     level=log.NOISY, parent=self._log_parent)
 310.             progress = False
 311.             for s in alreadygot:
 312.                 self.preexisting_shares[s] = peer.peerid
 313.                 if s in self.homeless_shares:
 314.                     self.homeless_shares.remove(s)
 315.                     progress = True
 316. 
 317.             # the PeerTracker will remember which shares were allocated on
 318.             # that peer. We just have to remember to use them.
 319.             if allocated:
 320.                 self.use_peers.add(peer)
 321.                 progress = True
 322. 
 323.             not_yet_present = set(shares_to_ask) - set(alreadygot)
 324.             still_homeless = not_yet_present - set(allocated)
 325. 
 326.             if progress:
 327.                 # they accepted or already had at least one share, so
 328.                 # progress has been made
 329.                 self.good_query_count += 1
 330.             else:
 331.                 self.bad_query_count += 1
 332. 
 333.             if still_homeless:
 334.                 # In networks with lots of space, this is very unusual and
 335.                 # probably indicates an error. In networks with peers that
 336.                 # are full, it is merely unusual. In networks that are very
 337.                 # full, it is common, and many uploads will fail. In most
 338.                 # cases, this is obviously not fatal, and we'll just use some
 339.                 # other peers.
 340. 
 341.                 # some shares are still homeless, keep trying to find them a
 342.                 # home. The ones that were rejected get first priority.
 343.                 self.homeless_shares = (list(still_homeless)
 344.                                         + self.homeless_shares)
 345.                 # Since they were unable to accept all of our requests, so it
 346.                 # is safe to assume that asking them again won't help.
 347.             else:
 348.                 # if they *were* able to accept everything, they might be
 349.                 # willing to accept even more.
 350.                 put_peer_here.append(peer)
 351. 
 352.         # now loop
 353.         return self._loop()
 354. 
 355. 
 356. class EncryptAnUploadable:
 357.     """This is a wrapper that takes an IUploadable and provides
 358.     IEncryptedUploadable."""
 359.     implements(IEncryptedUploadable)
 360.     CHUNKSIZE = 50*1024
 361. 
 362.     def __init__(self, original, log_parent=None):
 363.         self.original = IUploadable(original)
 364.         self._log_number = log_parent
 365.         self._encryptor = None
 366.         self._plaintext_hasher = plaintext_hasher()
 367.         self._plaintext_segment_hasher = None
 368.         self._plaintext_segment_hashes = []
 369.         self._encoding_parameters = None
 370.         self._file_size = None
 371.         self._ciphertext_bytes_read = 0
 372.         self._status = None
 373. 
 374.     def set_upload_status(self, upload_status):
 375.         self._status = IUploadStatus(upload_status)
 376.         self.original.set_upload_status(upload_status)
 377. 
 378.     def log(self, *args, **kwargs):
 379.         if "facility" not in kwargs:
 380.             kwargs["facility"] = "upload.encryption"
 381.         if "parent" not in kwargs:
 382.             kwargs["parent"] = self._log_number
 383.         return log.msg(*args, **kwargs)
 384. 
 385.     def get_size(self):
 386.         if self._file_size is not None:
 387.             return defer.succeed(self._file_size)
 388.         d = self.original.get_size()
 389.         def _got_size(size):
 390.             self._file_size = size
 391.             if self._status:
 392.                 self._status.set_size(size)
 393.             return size
 394.         d.addCallback(_got_size)
 395.         return d
 396. 
 397.     def get_all_encoding_parameters(self):
 398.         if self._encoding_parameters is not None:
 399.             return defer.succeed(self._encoding_parameters)
 400.         d = self.original.get_all_encoding_parameters()
 401.         def _got(encoding_parameters):
 402.             (k, happy, n, segsize) = encoding_parameters
 403.             self._segment_size = segsize # used by segment hashers
 404.             self._encoding_parameters = encoding_parameters
 405.             self.log("my encoding parameters: %s" % (encoding_parameters,),
 406.                      level=log.NOISY)
 407.             return encoding_parameters
 408.         d.addCallback(_got)
 409.         return d
 410. 
 411.     def _get_encryptor(self):
 412.         if self._encryptor:
 413.             return defer.succeed(self._encryptor)
 414. 
 415.         d = self.original.get_encryption_key()
 416.         def _got(key):
 417.             e = AES(key)
 418.             self._encryptor = e
 419. 
 420.             storage_index = storage_index_hash(key)
 421.             assert isinstance(storage_index, str)
 422.             # There's no point to having the SI be longer than the key, so we
 423.             # specify that it is truncated to the same 128 bits as the AES key.
 424.             assert len(storage_index) == 16  # SHA-256 truncated to 128b
 425.             self._storage_index = storage_index
 426.             if self._status:
 427.                 self._status.set_storage_index(storage_index)
 428.             return e
 429.         d.addCallback(_got)
 430.         return d
 431. 
 432.     def get_storage_index(self):
 433.         d = self._get_encryptor()
 434.         d.addCallback(lambda res: self._storage_index)
 435.         return d
 436. 
 437.     def _get_segment_hasher(self):
 438.         p = self._plaintext_segment_hasher
 439.         if p:
 440.             left = self._segment_size - self._plaintext_segment_hashed_bytes
 441.             return p, left
 442.         p = plaintext_segment_hasher()
 443.         self._plaintext_segment_hasher = p
 444.         self._plaintext_segment_hashed_bytes = 0
 445.         return p, self._segment_size
 446. 
 447.     def _update_segment_hash(self, chunk):
 448.         offset = 0
 449.         while offset < len(chunk):
 450.             p, segment_left = self._get_segment_hasher()
 451.             chunk_left = len(chunk) - offset
 452.             this_segment = min(chunk_left, segment_left)
 453.             p.update(chunk[offset:offset+this_segment])
 454.             self._plaintext_segment_hashed_bytes += this_segment
 455. 
 456.             if self._plaintext_segment_hashed_bytes == self._segment_size:
 457.                 # we've filled this segment
 458.                 self._plaintext_segment_hashes.append(p.digest())
 459.                 self._plaintext_segment_hasher = None
 460.                 self.log("closed hash [%d]: %dB" %
 461.                          (len(self._plaintext_segment_hashes)-1,
 462.                           self._plaintext_segment_hashed_bytes),
 463.                          level=log.NOISY)
 464.                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
 465.                          segnum=len(self._plaintext_segment_hashes)-1,
 466.                          hash=base32.b2a(p.digest()),
 467.                          level=log.NOISY)
 468. 
 469.             offset += this_segment
 470. 
 471. 
 472.     def read_encrypted(self, length, hash_only):
 473.         # make sure our parameters have been set up first
 474.         d = self.get_all_encoding_parameters()
 475.         # and size
 476.         d.addCallback(lambda ignored: self.get_size())
 477.         d.addCallback(lambda ignored: self._get_encryptor())
 478.         # then fetch and encrypt the plaintext. The unusual structure here
 479.         # (passing a Deferred *into* a function) is needed to avoid
 480.         # overflowing the stack: Deferreds don't optimize out tail recursion.
 481.         # We also pass in a list, to which _read_encrypted will append
 482.         # ciphertext.
 483.         ciphertext = []
 484.         d2 = defer.Deferred()
 485.         d.addCallback(lambda ignored:
 486.                       self._read_encrypted(length, ciphertext, hash_only, d2))
 487.         d.addCallback(lambda ignored: d2)
 488.         return d
 489. 
 490.     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
 491.         if not remaining:
 492.             fire_when_done.callback(ciphertext)
 493.             return None
 494.         # tolerate large length= values without consuming a lot of RAM by
 495.         # reading just a chunk (say 50kB) at a time. This only really matters
 496.         # when hash_only==True (i.e. resuming an interrupted upload), since
 497.         # that's the case where we will be skipping over a lot of data.
 498.         size = min(remaining, self.CHUNKSIZE)
 499.         remaining = remaining - size
 500.         # read a chunk of plaintext..
 501.         d = defer.maybeDeferred(self.original.read, size)
 502.         # N.B.: if read() is synchronous, then since everything else is
 503.         # actually synchronous too, we'd blow the stack unless we stall for a
 504.         # tick. Once you accept a Deferred from IUploadable.read(), you must
 505.         # be prepared to have it fire immediately too.
 506.         d.addCallback(eventual.fireEventually)
 507.         def _good(plaintext):
 508.             # and encrypt it..
 509.             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
 510.             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
 511.             ciphertext.extend(ct)
 512.             self._read_encrypted(remaining, ciphertext, hash_only,
 513.                                  fire_when_done)
 514.         def _err(why):
 515.             fire_when_done.errback(why)
 516.         d.addCallback(_good)
 517.         d.addErrback(_err)
 518.         return None
 519. 
 520.     def _hash_and_encrypt_plaintext(self, data, hash_only):
 521.         assert isinstance(data, (tuple, list)), type(data)
 522.         data = list(data)
 523.         cryptdata = []
 524.         # we use data.pop(0) instead of 'for chunk in data' to save
 525.         # memory: each chunk is destroyed as soon as we're done with it.
 526.         bytes_processed = 0
 527.         while data:
 528.             chunk = data.pop(0)
 529.             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
 530.                      level=log.NOISY)
 531.             bytes_processed += len(chunk)
 532.             self._plaintext_hasher.update(chunk)
 533.             self._update_segment_hash(chunk)
 534.             # TODO: we have to encrypt the data (even if hash_only==True)
 535.             # because pycryptopp's AES-CTR implementation doesn't offer a
 536.             # way to change the counter value. Once pycryptopp acquires
 537.             # this ability, change this to simply update the counter
 538.             # before each call to (hash_only==False) _encryptor.process()
 539.             ciphertext = self._encryptor.process(chunk)
 540.             if hash_only:
 541.                 self.log("  skipping encryption", level=log.NOISY)
 542.             else:
 543.                 cryptdata.append(ciphertext)
 544.             del ciphertext
 545.             del chunk
 546.         self._ciphertext_bytes_read += bytes_processed
 547.         if self._status:
 548.             progress = float(self._ciphertext_bytes_read) / self._file_size
 549.             self._status.set_progress(1, progress)
 550.         return cryptdata
 551. 
 552. 
 553.     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
 554.         if len(self._plaintext_segment_hashes) < num_segments:
 555.             # close out the last one
 556.             assert len(self._plaintext_segment_hashes) == num_segments-1
 557.             p, segment_left = self._get_segment_hasher()
 558.             self._plaintext_segment_hashes.append(p.digest())
 559.             del self._plaintext_segment_hasher
 560.             self.log("closing plaintext leaf hasher, hashed %d bytes" %
 561.                      self._plaintext_segment_hashed_bytes,
 562.                      level=log.NOISY)
 563.             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
 564.                      segnum=len(self._plaintext_segment_hashes)-1,
 565.                      hash=base32.b2a(p.digest()),
 566.                      level=log.NOISY)
 567.         assert len(self._plaintext_segment_hashes) == num_segments
 568.         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
 569. 
 570.     def get_plaintext_hash(self):
 571.         h = self._plaintext_hasher.digest()
 572.         return defer.succeed(h)
 573. 
 574.     def close(self):
 575.         return self.original.close()
 576. 
 577. class UploadStatus:
 578.     implements(IUploadStatus)
 579.     statusid_counter = itertools.count(0)
 580. 
 581.     def __init__(self):
 582.         self.storage_index = None
 583.         self.size = None
 584.         self.helper = False
 585.         self.status = "Not started"
 586.         self.progress = [0.0, 0.0, 0.0]
 587.         self.active = True
 588.         self.results = None
 589.         self.counter = self.statusid_counter.next()
 590.         self.started = time.time()
 591. 
 592.     def get_started(self):
 593.         return self.started
 594.     def get_storage_index(self):
 595.         return self.storage_index
 596.     def get_size(self):
 597.         return self.size
 598.     def using_helper(self):
 599.         return self.helper
 600.     def get_status(self):
 601.         return self.status
 602.     def get_progress(self):
 603.         return tuple(self.progress)
 604.     def get_active(self):
 605.         return self.active
 606.     def get_results(self):
 607.         return self.results
 608.     def get_counter(self):
 609.         return self.counter
 610. 
 611.     def set_storage_index(self, si):
 612.         self.storage_index = si
 613.     def set_size(self, size):
 614.         self.size = size
 615.     def set_helper(self, helper):
 616.         self.helper = helper
 617.     def set_status(self, status):
 618.         self.status = status
 619.     def set_progress(self, which, value):
 620.         # [0]: chk, [1]: ciphertext, [2]: encode+push
 621.         self.progress[which] = value
 622.     def set_active(self, value):
 623.         self.active = value
 624.     def set_results(self, value):
 625.         self.results = value
 626. 
 627. class CHKUploader:
 628.     peer_selector_class = Tahoe2PeerSelector
 629. 
 630.     def __init__(self, client):
 631.         self._client = client
 632.         self._log_number = self._client.log("CHKUploader starting")
 633.         self._encoder = None
 634.         self._results = UploadResults()
 635.         self._storage_index = None
 636.         self._upload_status = UploadStatus()
 637.         self._upload_status.set_helper(False)
 638.         self._upload_status.set_active(True)
 639.         self._upload_status.set_results(self._results)
 640. 
 641.     def log(self, *args, **kwargs):
 642.         if "parent" not in kwargs:
 643.             kwargs["parent"] = self._log_number
 644.         if "facility" not in kwargs:
 645.             kwargs["facility"] = "tahoe.upload"
 646.         return self._client.log(*args, **kwargs)
 647. 
 648.     def start(self, uploadable):
 649.         """Start uploading the file.
 650. 
 651.         This method returns a Deferred that will fire with the URI (a
 652.         string)."""
 653. 
 654.         self._started = time.time()
 655.         uploadable = IUploadable(uploadable)
 656.         self.log("starting upload of %s" % uploadable)
 657. 
 658.         eu = EncryptAnUploadable(uploadable, self._log_number)
 659.         eu.set_upload_status(self._upload_status)
 660.         d = self.start_encrypted(eu)
 661.         def _uploaded(res):
 662.             d1 = uploadable.get_encryption_key()
 663.             d1.addCallback(lambda key: self._compute_uri(res, key))
 664.             return d1
 665.         d.addCallback(_uploaded)
 666.         def _done(res):
 667.             self._upload_status.set_active(False)
 668.             return res
 669.         d.addBoth(_done)
 670.         return d
 671. 
 672.     def abort(self):
 673.         """Call this is the upload must be abandoned before it completes.
 674.         This will tell the shareholders to delete their partial shares. I
 675.         return a Deferred that fires when these messages have been acked."""
 676.         if not self._encoder:
 677.             # how did you call abort() before calling start() ?
 678.             return defer.succeed(None)
 679.         return self._encoder.abort()
 680. 
 681.     def start_encrypted(self, encrypted):
 682.         eu = IEncryptedUploadable(encrypted)
 683. 
 684.         started = time.time()
 685.         self._encoder = e = encode.Encoder(self._log_number,
 686.                                            self._upload_status)
 687.         d = e.set_encrypted_uploadable(eu)
 688.         d.addCallback(self.locate_all_shareholders, started)
 689.         d.addCallback(self.set_shareholders, e)
 690.         d.addCallback(lambda res: e.start())
 691.         d.addCallback(self._encrypted_done)
 692.         # this fires with the uri_extension_hash and other data
 693.         return d
 694. 
 695.     def locate_all_shareholders(self, encoder, started):
 696.         peer_selection_started = now = time.time()
 697.         self._storage_index_elapsed = now - started
 698.         storage_index = encoder.get_param("storage_index")
 699.         self._storage_index = storage_index
 700.         upload_id = storage.si_b2a(storage_index)[:5]
 701.         self.log("using storage index %s" % upload_id)
 702.         peer_selector = self.peer_selector_class(upload_id, self._log_number,
 703.                                                  self._upload_status)
 704. 
 705.         share_size = encoder.get_param("share_size")
 706.         block_size = encoder.get_param("block_size")
 707.         num_segments = encoder.get_param("num_segments")
 708.         k,desired,n = encoder.get_param("share_counts")
 709. 
 710.         self._peer_selection_started = time.time()
 711.         d = peer_selector.get_shareholders(self._client, storage_index,
 712.                                            share_size, block_size,
 713.                                            num_segments, n, desired)
 714.         def _done(res):
 715.             self._peer_selection_elapsed = time.time() - peer_selection_started
 716.             return res
 717.         d.addCallback(_done)
 718.         return d
 719. 
 720.     def set_shareholders(self, (used_peers, already_peers), encoder):
 721.         """
 722.         @param used_peers: a sequence of PeerTracker objects
 723.         @paran already_peers: a dict mapping sharenum to a peerid that
 724.                               claims to already have this share
 725.         """
 726.         self.log("_send_shares, used_peers is %s" % (used_peers,))
 727.         # record already-present shares in self._results
 728.         for (shnum, peerid) in already_peers.items():
 729.             peerid_s = idlib.shortnodeid_b2a(peerid)
 730.             self._results.sharemap[shnum] = "Found on [%s]" % peerid_s
 731.             if peerid not in self._results.servermap:
 732.                 self._results.servermap[peerid] = set()
 733.             self._results.servermap[peerid].add(shnum)
 734.         self._results.preexisting_shares = len(already_peers)
 735. 
 736.         self._sharemap = {}
 737.         for peer in used_peers:
 738.             assert isinstance(peer, PeerTracker)
 739.         buckets = {}
 740.         for peer in used_peers:
 741.             buckets.update(peer.buckets)
 742.             for shnum in peer.buckets:
 743.                 self._sharemap[shnum] = peer
 744.         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
 745.         encoder.set_shareholders(buckets)
 746. 
 747.     def _encrypted_done(self, res):
 748.         r = self._results
 749.         for shnum in self._encoder.get_shares_placed():
 750.             peer_tracker = self._sharemap[shnum]
 751.             peerid = peer_tracker.peerid
 752.             peerid_s = idlib.shortnodeid_b2a(peerid)
 753.             r.sharemap[shnum] = "Placed on [%s]" % peerid_s
 754.             if peerid not in r.servermap:
 755.                 r.servermap[peerid] = set()
 756.             r.servermap[peerid].add(shnum)
 757.         r.pushed_shares = len(self._encoder.get_shares_placed())
 758.         now = time.time()
 759.         r.file_size = self._encoder.file_size
 760.         r.timings["total"] = now - self._started
 761.         r.timings["storage_index"] = self._storage_index_elapsed
 762.         r.timings["peer_selection"] = self._peer_selection_elapsed
 763.         r.timings.update(self._encoder.get_times())
 764.         r.uri_extension_data = self._encoder.get_uri_extension_data()
 765.         return res
 766. 
 767.     def _compute_uri(self, (uri_extension_hash,
 768.                             needed_shares, total_shares, size),
 769.                      key):
 770.         u = uri.CHKFileURI(key=key,
 771.                            uri_extension_hash=uri_extension_hash,
 772.                            needed_shares=needed_shares,
 773.                            total_shares=total_shares,
 774.                            size=size,
 775.                            )
 776.         r = self._results
 777.         r.uri = u.to_string()
 778.         return r
 779. 
 780.     def get_upload_status(self):
 781.         return self._upload_status
 782. 
 783. def read_this_many_bytes(uploadable, size, prepend_data=[]):
 784.     if size == 0:
 785.         return defer.succeed([])
 786.     d = uploadable.read(size)
 787.     def _got(data):
 788.         assert isinstance(data, list)
 789.         bytes = sum([len(piece) for piece in data])
 790.         assert bytes > 0
 791.         assert bytes <= size
 792.         remaining = size - bytes
 793.         if remaining:
 794.             return read_this_many_bytes(uploadable, remaining,
 795.                                         prepend_data + data)
 796.         return prepend_data + data
 797.     d.addCallback(_got)
 798.     return d
 799. 
 800. class LiteralUploader:
 801. 
 802.     def __init__(self, client):
 803.         self._client = client
 804.         self._results = UploadResults()
 805.         self._status = s = UploadStatus()
 806.         s.set_storage_index(None)
 807.         s.set_helper(False)
 808.         s.set_progress(0, 1.0)
 809.         s.set_active(False)
 810.         s.set_results(self._results)
 811. 
 812.     def start(self, uploadable):
 813.         uploadable = IUploadable(uploadable)
 814.         d = uploadable.get_size()
 815.         def _got_size(size):
 816.             self._size = size
 817.             self._status.set_size(size)
 818.             self._results.file_size = size
 819.             return read_this_many_bytes(uploadable, size)
 820.         d.addCallback(_got_size)
 821.         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
 822.         d.addCallback(lambda u: u.to_string())
 823.         d.addCallback(self._build_results)
 824.         return d
 825. 
 826.     def _build_results(self, uri):
 827.         self._results.uri = uri
 828.         self._status.set_status("Done")
 829.         self._status.set_progress(1, 1.0)
 830.         self._status.set_progress(2, 1.0)
 831.         return self._results
 832. 
 833.     def close(self):
 834.         pass
 835. 
 836.     def get_upload_status(self):
 837.         return self._status
 838. 
 839. class RemoteEncryptedUploadable(Referenceable):
 840.     implements(RIEncryptedUploadable)
 841. 
 842.     def __init__(self, encrypted_uploadable, upload_status):
 843.         self._eu = IEncryptedUploadable(encrypted_uploadable)
 844.         self._offset = 0
 845.         self._bytes_sent = 0
 846.         self._status = IUploadStatus(upload_status)
 847.         # we are responsible for updating the status string while we run, and
 848.         # for setting the ciphertext-fetch progress.
 849.         self._size = None
 850. 
 851.     def get_size(self):
 852.         if self._size is not None:
 853.             return defer.succeed(self._size)
 854.         d = self._eu.get_size()
 855.         def _got_size(size):
 856.             self._size = size
 857.             return size
 858.         d.addCallback(_got_size)
 859.         return d
 860. 
 861.     def remote_get_size(self):
 862.         return self.get_size()
 863.     def remote_get_all_encoding_parameters(self):
 864.         return self._eu.get_all_encoding_parameters()
 865. 
 866.     def _read_encrypted(self, length, hash_only):
 867.         d = self._eu.read_encrypted(length, hash_only)
 868.         def _read(strings):
 869.             if hash_only:
 870.                 self._offset += length
 871.             else:
 872.                 size = sum([len(data) for data in strings])
 873.                 self._offset += size
 874.             return strings
 875.         d.addCallback(_read)
 876.         return d
 877. 
 878.     def remote_read_encrypted(self, offset, length):
 879.         # we don't support seek backwards, but we allow skipping forwards
 880.         precondition(offset >= 0, offset)
 881.         precondition(length >= 0, length)
 882.         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
 883.                      level=log.NOISY)
 884.         precondition(offset >= self._offset, offset, self._offset)
 885.         if offset > self._offset:
 886.             # read the data from disk anyways, to build up the hash tree
 887.             skip = offset - self._offset
 888.             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
 889.                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
 890.             d = self._read_encrypted(skip, hash_only=True)
 891.         else:
 892.             d = defer.succeed(None)
 893. 
 894.         def _at_correct_offset(res):
 895.             assert offset == self._offset, "%d != %d" % (offset, self._offset)
 896.             return self._read_encrypted(length, hash_only=False)
 897.         d.addCallback(_at_correct_offset)
 898. 
 899.         def _read(strings):
 900.             size = sum([len(data) for data in strings])
 901.             self._bytes_sent += size
 902.             return strings
 903.         d.addCallback(_read)
 904.         return d
 905. 
 906.     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
 907.         log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
 908.                 (first, last-1, num_segments),
 909.                 level=log.NOISY)
 910.         d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
 911.         d.addCallback(list)
 912.         return d
 913.     def remote_get_plaintext_hash(self):
 914.         return self._eu.get_plaintext_hash()
 915.     def remote_close(self):
 916.         return self._eu.close()
 917. 
 918. 
 919. class AssistedUploader:
 920. 
 921.     def __init__(self, helper):
 922.         self._helper = helper
 923.         self._log_number = log.msg("AssistedUploader starting")
 924.         self._storage_index = None
 925.         self._upload_status = s = UploadStatus()
 926.         s.set_helper(True)
 927.         s.set_active(True)
 928. 
 929.     def log(self, *args, **kwargs):
 930.         if "parent" not in kwargs:
 931.             kwargs["parent"] = self._log_number
 932.         return log.msg(*args, **kwargs)
 933. 
 934.     def start(self, uploadable):
 935.         self._started = time.time()
 936.         u = IUploadable(uploadable)
 937.         eu = EncryptAnUploadable(u, self._log_number)
 938.         eu.set_upload_status(self._upload_status)
 939.         self._encuploadable = eu
 940.         d = eu.get_size()
 941.         d.addCallback(self._got_size)
 942.         d.addCallback(lambda res: eu.get_all_encoding_parameters())
 943.         d.addCallback(self._got_all_encoding_parameters)
 944.         # when we get the encryption key, that will also compute the storage
 945.         # index, so this only takes one pass.
 946.         # TODO: I'm not sure it's cool to switch back and forth between
 947.         # the Uploadable and the IEncryptedUploadable that wraps it.
 948.         d.addCallback(lambda res: u.get_encryption_key())
 949.         d.addCallback(self._got_encryption_key)
 950.         d.addCallback(lambda res: eu.get_storage_index())
 951.         d.addCallback(self._got_storage_index)
 952.         d.addCallback(self._contact_helper)
 953.         d.addCallback(self._build_readcap)
 954.         def _done(res):
 955.             self._upload_status.set_active(False)
 956.             return res
 957.         d.addBoth(_done)
 958.         return d
 959. 
 960.     def _got_size(self, size):
 961.         self._size = size
 962.         self._upload_status.set_size(size)
 963. 
 964.     def _got_all_encoding_parameters(self, params):
 965.         k, happy, n, segment_size = params
 966.         # stash these for URI generation later
 967.         self._needed_shares = k
 968.         self._total_shares = n
 969.         self._segment_size = segment_size
 970. 
 971.     def _got_encryption_key(self, key):
 972.         self._key = key
 973. 
 974.     def _got_storage_index(self, storage_index):
 975.         self._storage_index = storage_index
 976. 
 977. 
 978.     def _contact_helper(self, res):
 979.         now = self._time_contacting_helper_start = time.time()
 980.         self._storage_index_elapsed = now - self._started
 981.         self.log(format="contacting helper for SI %(si)s..",
 982.                  si=storage.si_b2a(self._storage_index))
 983.         self._upload_status.set_status("Contacting Helper")
 984.         d = self._helper.callRemote("upload_chk", self._storage_index)
 985.         d.addCallback(self._contacted_helper)
 986.         return d
 987. 
 988.     def _contacted_helper(self, (upload_results, upload_helper)):
 989.         now = time.time()
 990.         elapsed = now - self._time_contacting_helper_start
 991.         self._elapsed_time_contacting_helper = elapsed
 992.         if upload_helper:
 993.             self.log("helper says we need to upload")
 994.             self._upload_status.set_status("Uploading Ciphertext")
 995.             # we need to upload the file
 996.             reu = RemoteEncryptedUploadable(self._encuploadable,
 997.                                             self._upload_status)
 998.             # let it pre-compute the size for progress purposes
 999.             d = reu.get_size()
1000.             d.addCallback(lambda ignored:
1001.                           upload_helper.callRemote("upload", reu))
1002.             # this Deferred will fire with the upload results
1003.             return d
1004.         self.log("helper says file is already uploaded")
1005.         self._upload_status.set_progress(1, 1.0)
1006.         self._upload_status.set_results(upload_results)
1007.         return upload_results
1008. 
1009.     def _build_readcap(self, upload_results):
1010.         self.log("upload finished, building readcap")
1011.         self._upload_status.set_status("Building Readcap")
1012.         r = upload_results
1013.         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1014.         assert r.uri_extension_data["total_shares"] == self._total_shares
1015.         assert r.uri_extension_data["segment_size"] == self._segment_size
1016.         assert r.uri_extension_data["size"] == self._size
1017.         u = uri.CHKFileURI(key=self._key,
1018.                            uri_extension_hash=r.uri_extension_hash,
1019.                            needed_shares=self._needed_shares,
1020.                            total_shares=self._total_shares,
1021.                            size=self._size,
1022.                            )
1023.         r.uri = u.to_string()
1024.         now = time.time()
1025.         r.file_size = self._size
1026.         r.timings["storage_index"] = self._storage_index_elapsed
1027.         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1028.         if "total" in r.timings:
1029.             r.timings["helper_total"] = r.timings["total"]
1030.         r.timings["total"] = now - self._started
1031.         self._upload_status.set_status("Done")
1032.         self._upload_status.set_results(r)
1033.         return r
1034. 
1035.     def get_upload_status(self):
1036.         return self._upload_status
1037. 
1038. class BaseUploadable:
1039.     default_max_segment_size = 128*KiB # overridden by max_segment_size
1040.     default_encoding_param_k = 3 # overridden by encoding_parameters
1041.     default_encoding_param_happy = 7
1042.     default_encoding_param_n = 10
1043. 
1044.     max_segment_size = None
1045.     encoding_param_k = None
1046.     encoding_param_happy = None
1047.     encoding_param_n = None
1048. 
1049.     _all_encoding_parameters = None
1050.     _status = None
1051. 
1052.     def set_upload_status(self, upload_status):
1053.         self._status = IUploadStatus(upload_status)
1054. 
1055.     def set_default_encoding_parameters(self, default_params):
1056.         assert isinstance(default_params, dict)
1057.         for k,v in default_params.items():
1058.             precondition(isinstance(k, str), k, v)
1059.             precondition(isinstance(v, int), k, v)
1060.         if "k" in default_params:
1061.             self.default_encoding_param_k = default_params["k"]
1062.         if "happy" in default_params:
1063.             self.default_encoding_param_happy = default_params["happy"]
1064.         if "n" in default_params:
1065.             self.default_encoding_param_n = default_params["n"]
1066.         if "max_segment_size" in default_params:
1067.             self.default_max_segment_size = default_params["max_segment_size"]
1068. 
1069.     def get_all_encoding_parameters(self):
1070.         if self._all_encoding_parameters:
1071.             return defer.succeed(self._all_encoding_parameters)
1072. 
1073.         max_segsize = self.max_segment_size or self.default_max_segment_size
1074.         k = self.encoding_param_k or self.default_encoding_param_k
1075.         happy = self.encoding_param_happy or self.default_encoding_param_happy
1076.         n = self.encoding_param_n or self.default_encoding_param_n
1077. 
1078.         d = self.get_size()
1079.         def _got_size(file_size):
1080.             # for small files, shrink the segment size to avoid wasting space
1081.             segsize = min(max_segsize, file_size)
1082.             # this must be a multiple of 'required_shares'==k
1083.             segsize = mathutil.next_multiple(segsize, k)
1084.             encoding_parameters = (k, happy, n, segsize)
1085.             self._all_encoding_parameters = encoding_parameters
1086.             return encoding_parameters
1087.         d.addCallback(_got_size)
1088.         return d
1089. 
1090. class FileHandle(BaseUploadable):
1091.     implements(IUploadable)
1092. 
1093.     def __init__(self, filehandle, convergence):
1094.         """
1095.         Upload the data from the filehandle.  If convergence is None then a
1096.         random encryption key will be used, else the plaintext will be hashed,
1097.         then the hash will be hashed together with the string in the
1098.         "convergence" argument to form the encryption key.
1099.         """
1100.         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1101.         self._filehandle = filehandle
1102.         self._key = None
1103.         self.convergence = convergence
1104.         self._size = None
1105. 
1106.     def _get_encryption_key_convergent(self):
1107.         if self._key is not None:
1108.             return defer.succeed(self._key)
1109. 
1110.         d = self.get_size()
1111.         # that sets self._size as a side-effect
1112.         d.addCallback(lambda size: self.get_all_encoding_parameters())
1113.         def _got(params):
1114.             k, happy, n, segsize = params
1115.             f = self._filehandle
1116.             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1117.             f.seek(0)
1118.             BLOCKSIZE = 64*1024
1119.             bytes_read = 0
1120.             while True:
1121.                 data = f.read(BLOCKSIZE)
1122.                 if not data:
1123.                     break
1124.                 enckey_hasher.update(data)
1125.                 # TODO: setting progress in a non-yielding loop is kind of
1126.                 # pointless, but I'm anticipating (perhaps prematurely) the
1127.                 # day when we use a slowjob or twisted's CooperatorService to
1128.                 # make this yield time to other jobs.
1129.                 bytes_read += len(data)
1130.                 if self._status:
1131.                     self._status.set_progress(0, float(bytes_read)/self._size)
1132.             f.seek(0)
1133.             self._key = enckey_hasher.digest()
1134.             if self._status:
1135.                 self._status.set_progress(0, 1.0)
1136.             assert len(self._key) == 16
1137.             return self._key
1138.         d.addCallback(_got)
1139.         return d
1140. 
1141.     def _get_encryption_key_random(self):
1142.         if self._key is None:
1143.             self._key = os.urandom(16)
1144.         return defer.succeed(self._key)
1145. 
1146.     def get_encryption_key(self):
1147.         if self.convergence is not None:
1148.             return self._get_encryption_key_convergent()
1149.         else:
1150.             return self._get_encryption_key_random()
1151. 
1152.     def get_size(self):
1153.         if self._size is not None:
1154.             return defer.succeed(self._size)
1155.         self._filehandle.seek(0,2)
1156.         size = self._filehandle.tell()
1157.         self._size = size
1158.         self._filehandle.seek(0)
1159.         return defer.succeed(size)
1160. 
1161.     def read(self, length):
1162.         return defer.succeed([self._filehandle.read(length)])
1163. 
1164.     def close(self):
1165.         # the originator of the filehandle reserves the right to close it
1166.         pass
1167. 
1168. class FileName(FileHandle):
1169.     def __init__(self, filename, convergence):
1170.         """
1171.         Upload the data from the filename.  If convergence is None then a
1172.         random encryption key will be used, else the plaintext will be hashed,
1173.         then the hash will be hashed together with the string in the
1174.         "convergence" argument to form the encryption key.
1175.         """
1176.         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1177.         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1178.     def close(self):
1179.         FileHandle.close(self)
1180.         self._filehandle.close()
1181. 
1182. class Data(FileHandle):
1183.     def __init__(self, data, convergence):
1184.         """
1185.         Upload the data from the data argument.  If convergence is None then a
1186.         random encryption key will be used, else the plaintext will be hashed,
1187.         then the hash will be hashed together with the string in the
1188.         "convergence" argument to form the encryption key.
1189.         """
1190.         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1191.         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1192. 
1193. class Uploader(service.MultiService):
1194.     """I am a service that allows file uploading. I am a service-child of the
1195.     Client.
1196.     """
1197.     implements(IUploader)
1198.     name = "uploader"
1199.     uploader_class = CHKUploader
1200.     URI_LIT_SIZE_THRESHOLD = 55
1201.     MAX_UPLOAD_STATUSES = 10
1202. 
1203.     def __init__(self, helper_furl=None, stats_provider=None):
1204.         self._helper_furl = helper_furl
1205.         self.stats_provider = stats_provider
1206.         self._helper = None
1207.         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1208.         self._all_upload_statuses = weakref.WeakKeyDictionary()
1209.         self._recent_upload_statuses = []
1210.         service.MultiService.__init__(self)
1211. 
1212.     def startService(self):
1213.         service.MultiService.startService(self)
1214.         if self._helper_furl:
1215.             self.parent.tub.connectTo(self._helper_furl,
1216.                                       self._got_helper)
1217. 
1218.     def _got_helper(self, helper):
1219.         self._helper = helper
1220.         helper.notifyOnDisconnect(self._lost_helper)
1221.     def _lost_helper(self):
1222.         self._helper = None
1223. 
1224.     def get_helper_info(self):
1225.         # return a tuple of (helper_furl_or_None, connected_bool)
1226.         return (self._helper_furl, bool(self._helper))
1227. 
1228.     def upload(self, uploadable):
1229.         # this returns the URI
1230.         assert self.parent
1231.         assert self.running
1232. 
1233.         uploadable = IUploadable(uploadable)
1234.         d = uploadable.get_size()
1235.         def _got_size(size):
1236.             default_params = self.parent.get_encoding_parameters()
1237.             precondition(isinstance(default_params, dict), default_params)
1238.             precondition("max_segment_size" in default_params, default_params)
1239.             uploadable.set_default_encoding_parameters(default_params)
1240. 
1241.             if self.stats_provider:
1242.                 self.stats_provider.count('uploader.files_uploaded', 1)
1243.                 self.stats_provider.count('uploader.bytes_uploaded', size)
1244. 
1245.             if size <= self.URI_LIT_SIZE_THRESHOLD:
1246.                 uploader = LiteralUploader(self.parent)
1247.             elif self._helper:
1248.                 uploader = AssistedUploader(self._helper)
1249.             else:
1250.                 uploader = self.uploader_class(self.parent)
1251.             self._add_upload(uploader)
1252.             return uploader.start(uploadable)
1253.         d.addCallback(_got_size)
1254.         def _done(res):
1255.             uploadable.close()
1256.             return res
1257.         d.addBoth(_done)
1258.         return d
1259. 
1260.     def _add_upload(self, uploader):
1261.         s = uploader.get_upload_status()
1262.         self._all_uploads[uploader] = None
1263.         self._all_upload_statuses[s] = None
1264.         self._recent_upload_statuses.append(s)
1265.         while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
1266.             self._recent_upload_statuses.pop(0)
1267. 
1268.     def list_all_upload_statuses(self):
1269.         for us in self._all_upload_statuses:
1270.             yield us