source file: /home/buildslave/tahoe/edgy/build/src/allmydata/immutable/download.py
file stats: 841 lines, 811 executed: 96.4% covered
coverage versus previous test: 0 lines added, 0 lines removed
    1. import os, random, weakref, itertools, time
    2. from zope.interface import implements
    3. from twisted.internet import defer
    4. from twisted.internet.interfaces import IPushProducer, IConsumer
    5. from foolscap.api import DeadReferenceError, RemoteException, eventually
    6. 
    7. from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
    8. from allmydata.util.assertutil import _assert, precondition
    9. from allmydata import codec, hashtree, uri
   10. from allmydata.interfaces import IDownloadTarget, IDownloader, \
   11.      IFileURI, IVerifierURI, \
   12.      IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
   13.      IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \
   14.      UnableToFetchCriticalDownloadDataError
   15. from allmydata.immutable import layout
   16. from allmydata.monitor import Monitor
   17. from pycryptopp.cipher.aes import AES
   18. 
   19. class IntegrityCheckReject(Exception):
   20.     pass
   21. 
   22. class BadURIExtensionHashValue(IntegrityCheckReject):
   23.     pass
   24. class BadURIExtension(IntegrityCheckReject):
   25.     pass
   26. class UnsupportedErasureCodec(BadURIExtension):
   27.     pass
   28. class BadCrypttextHashValue(IntegrityCheckReject):
   29.     pass
   30. class BadOrMissingHash(IntegrityCheckReject):
   31.     pass
   32. 
   33. class DownloadStopped(Exception):
   34.     pass
   35. 
   36. class DownloadResults:
   37.     implements(IDownloadResults)
   38. 
   39.     def __init__(self):
   40.         self.servers_used = set()
   41.         self.server_problems = {}
   42.         self.servermap = {}
   43.         self.timings = {}
   44.         self.file_size = None
   45. 
   46. class DecryptingTarget(log.PrefixingLogMixin):
   47.     implements(IDownloadTarget, IConsumer)
   48.     def __init__(self, target, key, _log_msg_id=None):
   49.         precondition(IDownloadTarget.providedBy(target), target)
   50.         self.target = target
   51.         self._decryptor = AES(key)
   52.         prefix = str(target)
   53.         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
   54.     # methods to satisfy the IConsumer interface
   55.     def registerProducer(self, producer, streaming):
   56.         if IConsumer.providedBy(self.target):
   57.             self.target.registerProducer(producer, streaming)
   58.     def unregisterProducer(self):
   59.         if IConsumer.providedBy(self.target):
   60.             self.target.unregisterProducer()
   61.     def write(self, ciphertext):
   62.         plaintext = self._decryptor.process(ciphertext)
   63.         self.target.write(plaintext)
   64.     def open(self, size):
   65.         self.target.open(size)
   66.     def close(self):
   67.         self.target.close()
   68.     def finish(self):
   69.         return self.target.finish()
   70.     # The following methods is just to pass through to the next target, and just because that
   71.     # target might be a repairer.DownUpConnector, and just because the current CHKUpload object
   72.     # expects to find the storage index in its Uploadable.
   73.     def set_storageindex(self, storageindex):
   74.         self.target.set_storageindex(storageindex)
   75.     def set_encodingparams(self, encodingparams):
   76.         self.target.set_encodingparams(encodingparams)
   77. 
   78. class ValidatedThingObtainer:
   79.     def __init__(self, validatedthingproxies, debugname, log_id):
   80.         self._validatedthingproxies = validatedthingproxies
   81.         self._debugname = debugname
   82.         self._log_id = log_id
   83. 
   84.     def _bad(self, f, validatedthingproxy):
   85.         failtype = f.trap(RemoteException, DeadReferenceError,
   86.                           IntegrityCheckReject, layout.LayoutInvalid,
   87.                           layout.ShareVersionIncompatible)
   88.         level = log.WEIRD
   89.         if f.check(DeadReferenceError):
   90.             level = log.UNUSUAL
   91.         elif f.check(RemoteException):
   92.             level = log.WEIRD
   93.         else:
   94.             level = log.SCARY
   95.         log.msg(parent=self._log_id, facility="tahoe.immutable.download",
   96.                 format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
   97.                 op=self._debugname, validatedthingproxy=str(validatedthingproxy),
   98.                 failure=f, level=level, umid="JGXxBA")
   99.         if not self._validatedthingproxies:
  100.             raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
  101.         # try again with a different one
  102.         d = self._try_the_next_one()
  103.         return d
  104. 
  105.     def _try_the_next_one(self):
  106.         vtp = self._validatedthingproxies.pop(0)
  107.         d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
  108.         d.addErrback(self._bad, vtp)
  109.         return d
  110. 
  111.     def start(self):
  112.         return self._try_the_next_one()
  113. 
  114. class ValidatedCrypttextHashTreeProxy:
  115.     implements(IValidatedThingProxy)
  116.     """ I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
  117.     its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
  118.     start() method that fires with self once I get a valid one). """
  119.     def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, fetch_failures=None):
  120.         # fetch_failures is for debugging -- see test_encode.py
  121.         self._readbucketproxy = readbucketproxy
  122.         self._num_segments = num_segments
  123.         self._fetch_failures = fetch_failures
  124.         self._crypttext_hash_tree = crypttext_hash_tree
  125. 
  126.     def _validate(self, proposal):
  127.         ct_hashes = dict(list(enumerate(proposal)))
  128.         try:
  129.             self._crypttext_hash_tree.set_hashes(ct_hashes)
  130.         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
  131.             if self._fetch_failures is not None:
  132.                 self._fetch_failures["crypttext_hash_tree"] += 1
  133.             raise BadOrMissingHash(le)
  134.         # If we now have enough of the crypttext hash tree to integrity-check *any* segment of ciphertext, then we are done.
  135.         # TODO: It would have better alacrity if we downloaded only part of the crypttext hash tree at a time.
  136.         for segnum in range(self._num_segments):
  137.             if self._crypttext_hash_tree.needed_hashes(segnum):
  138.                 raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
  139.         return self
  140. 
  141.     def start(self):
  142.         d = self._readbucketproxy.get_crypttext_hashes()
  143.         d.addCallback(self._validate)
  144.         return d
  145. 
  146. class ValidatedExtendedURIProxy:
  147.     implements(IValidatedThingProxy)
  148.     """ I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
  149.     retrieving and validating the elements from the UEB. """
  150. 
  151.     def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
  152.         # fetch_failures is for debugging -- see test_encode.py
  153.         self._fetch_failures = fetch_failures
  154.         self._readbucketproxy = readbucketproxy
  155.         precondition(IVerifierURI.providedBy(verifycap), verifycap)
  156.         self._verifycap = verifycap
  157. 
  158.         # required
  159.         self.segment_size = None
  160.         self.crypttext_root_hash = None
  161.         self.share_root_hash = None
  162. 
  163.         # computed
  164.         self.block_size = None
  165.         self.share_size = None
  166.         self.num_segments = None
  167.         self.tail_data_size = None
  168.         self.tail_segment_size = None
  169. 
  170.         # optional
  171.         self.crypttext_hash = None
  172. 
  173.     def __str__(self):
  174.         return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
  175. 
  176.     def _check_integrity(self, data):
  177.         h = hashutil.uri_extension_hash(data)
  178.         if h != self._verifycap.uri_extension_hash:
  179.             msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
  180.                    (self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
  181.             if self._fetch_failures is not None:
  182.                 self._fetch_failures["uri_extension"] += 1
  183.             raise BadURIExtensionHashValue(msg)
  184.         else:
  185.             return data
  186. 
  187.     def _parse_and_validate(self, data):
  188.         self.share_size = mathutil.div_ceil(self._verifycap.size, self._verifycap.needed_shares)
  189. 
  190.         d = uri.unpack_extension(data)
  191. 
  192.         # There are several kinds of things that can be found in a UEB.  First, things that we
  193.         # really need to learn from the UEB in order to do this download. Next: things which are
  194.         # optional but not redundant -- if they are present in the UEB they will get used. Next,
  195.         # things that are optional and redundant. These things are required to be consistent:
  196.         # they don't have to be in the UEB, but if they are in the UEB then they will be checked
  197.         # for consistency with the already-known facts, and if they are inconsistent then an
  198.         # exception will be raised. These things aren't actually used -- they are just tested
  199.         # for consistency and ignored. Finally: things which are deprecated -- they ought not be
  200.         # in the UEB at all, and if they are present then a warning will be logged but they are
  201.         # otherwise ignored.
  202. 
  203.        # First, things that we really need to learn from the UEB: segment_size,
  204.         # crypttext_root_hash, and share_root_hash.
  205.         self.segment_size = d['segment_size']
  206. 
  207.         self.block_size = mathutil.div_ceil(self.segment_size, self._verifycap.needed_shares)
  208.         self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
  209. 
  210.         self.tail_data_size = self._verifycap.size % self.segment_size
  211.         if not self.tail_data_size:
  212.             self.tail_data_size = self.segment_size
  213.         # padding for erasure code
  214.         self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, self._verifycap.needed_shares)
  215. 
  216.         # Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
  217.         # matches this read-cap or verify-cap.  The integrity check on the shares is not
  218.         # sufficient to prevent the original encoder from creating some shares of file A and
  219.         # other shares of file B.
  220.         self.crypttext_root_hash = d['crypttext_root_hash']
  221. 
  222.         self.share_root_hash = d['share_root_hash']
  223. 
  224. 
  225.         # Next: things that are optional and not redundant: crypttext_hash
  226.         if d.has_key('crypttext_hash'):
  227.             self.crypttext_hash = d['crypttext_hash']
  228.             if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
  229.                 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
  230. 
  231. 
  232.         # Next: things that are optional, redundant, and required to be consistent: codec_name,
  233.         # codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
  234.         if d.has_key('codec_name'):
  235.             if d['codec_name'] != "crs":
  236.                 raise UnsupportedErasureCodec(d['codec_name'])
  237. 
  238.         if d.has_key('codec_params'):
  239.             ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
  240.             if ucpss != self.segment_size:
  241.                 raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
  242.                                       "self.segment_size: %s" % (ucpss, self.segment_size))
  243.             if ucpns != self._verifycap.needed_shares:
  244.                 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
  245.                                       "self._verifycap.needed_shares: %s" % (ucpns,
  246.                                                                              self._verifycap.needed_shares))
  247.             if ucpts != self._verifycap.total_shares:
  248.                 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
  249.                                       "self._verifycap.total_shares: %s" % (ucpts,
  250.                                                                             self._verifycap.total_shares))
  251. 
  252.         if d.has_key('tail_codec_params'):
  253.             utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
  254.             if utcpss != self.tail_segment_size:
  255.                 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
  256.                                       "self.tail_segment_size: %s, self._verifycap.size: %s, "
  257.                                       "self.segment_size: %s, self._verifycap.needed_shares: %s"
  258.                                       % (utcpss, self.tail_segment_size, self._verifycap.size,
  259.                                          self.segment_size, self._verifycap.needed_shares))
  260.             if utcpns != self._verifycap.needed_shares:
  261.                 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
  262.                                       "self._verifycap.needed_shares: %s" % (utcpns,
  263.                                                                              self._verifycap.needed_shares))
  264.             if utcpts != self._verifycap.total_shares:
  265.                 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
  266.                                       "self._verifycap.total_shares: %s" % (utcpts,
  267.                                                                             self._verifycap.total_shares))
  268. 
  269.         if d.has_key('num_segments'):
  270.             if d['num_segments'] != self.num_segments:
  271.                 raise BadURIExtension("inconsistent num_segments: size: %s, "
  272.                                       "segment_size: %s, computed_num_segments: %s, "
  273.                                       "ueb_num_segments: %s" % (self._verifycap.size,
  274.                                                                 self.segment_size,
  275.                                                                 self.num_segments, d['num_segments']))
  276. 
  277.         if d.has_key('size'):
  278.             if d['size'] != self._verifycap.size:
  279.                 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
  280.                                       (self._verifycap.size, d['size']))
  281. 
  282.         if d.has_key('needed_shares'):
  283.             if d['needed_shares'] != self._verifycap.needed_shares:
  284.                 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
  285.                                       "needed shares: %s" % (self._verifycap.total_shares,
  286.                                                              d['needed_shares']))
  287. 
  288.         if d.has_key('total_shares'):
  289.             if d['total_shares'] != self._verifycap.total_shares:
  290.                 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
  291.                                       "total shares: %s" % (self._verifycap.total_shares,
  292.                                                             d['total_shares']))
  293. 
  294.         # Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
  295.         if d.get('plaintext_hash'):
  296.             log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
  297.                     "and is no longer used.  Ignoring.  %s" % (self,))
  298.         if d.get('plaintext_root_hash'):
  299.             log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
  300.                     "reasons and is no longer used.  Ignoring.  %s" % (self,))
  301. 
  302.         return self
  303. 
  304.     def start(self):
  305.         """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
  306.         it.  Returns a deferred which is called back with self once the fetch is successful, or
  307.         is erred back if it fails. """
  308.         d = self._readbucketproxy.get_uri_extension()
  309.         d.addCallback(self._check_integrity)
  310.         d.addCallback(self._parse_and_validate)
  311.         return d
  312. 
  313. class ValidatedReadBucketProxy(log.PrefixingLogMixin):
  314.     """I am a front-end for a remote storage bucket, responsible for retrieving and validating
  315.     data from that bucket.
  316. 
  317.     My get_block() method is used by BlockDownloaders.
  318.     """
  319. 
  320.     def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
  321.         """ share_hash_tree is required to have already been initialized with the root hash
  322.         (the number-0 hash), using the share_root_hash from the UEB """
  323.         precondition(share_hash_tree[0] is not None, share_hash_tree)
  324.         prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
  325.         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
  326.         self.sharenum = sharenum
  327.         self.bucket = bucket
  328.         self.share_hash_tree = share_hash_tree
  329.         self.num_blocks = num_blocks
  330.         self.block_size = block_size
  331.         self.share_size = share_size
  332.         self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
  333. 
  334.     def get_block(self, blocknum):
  335.         # the first time we use this bucket, we need to fetch enough elements
  336.         # of the share hash tree to validate it from our share hash up to the
  337.         # hashroot.
  338.         if self.share_hash_tree.needed_hashes(self.sharenum):
  339.             d1 = self.bucket.get_share_hashes()
  340.         else:
  341.             d1 = defer.succeed([])
  342. 
  343.         # We might need to grab some elements of our block hash tree, to
  344.         # validate the requested block up to the share hash.
  345.         blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
  346.         # We don't need the root of the block hash tree, as that comes in the share tree.
  347.         blockhashesneeded.discard(0)
  348.         d2 = self.bucket.get_block_hashes(blockhashesneeded)
  349. 
  350.         if blocknum < self.num_blocks-1:
  351.             thisblocksize = self.block_size
  352.         else:
  353.             thisblocksize = self.share_size % self.block_size
  354.             if thisblocksize == 0:
  355.                 thisblocksize = self.block_size
  356.         d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
  357. 
  358.         dl = deferredutil.gatherResults([d1, d2, d3])
  359.         dl.addCallback(self._got_data, blocknum)
  360.         return dl
  361. 
  362.     def _got_data(self, results, blocknum):
  363.         precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
  364.         sharehashes, blockhashes, blockdata = results
  365.         try:
  366.             sharehashes = dict(sharehashes)
  367.         except ValueError, le:
  368.             le.args = tuple(le.args + (sharehashes,))
  369.             raise
  370.         blockhashes = dict(enumerate(blockhashes))
  371. 
  372.         candidate_share_hash = None # in case we log it in the except block below
  373.         blockhash = None # in case we log it in the except block below
  374. 
  375.         try:
  376.             if self.share_hash_tree.needed_hashes(self.sharenum):
  377.                 # This will raise exception if the values being passed do not match the root
  378.                 # node of self.share_hash_tree.
  379.                 try:
  380.                     self.share_hash_tree.set_hashes(sharehashes)
  381.                 except IndexError, le:
  382.                     # Weird -- sharehashes contained index numbers outside of the range that fit
  383.                     # into this hash tree.
  384.                     raise BadOrMissingHash(le)
  385. 
  386.             # To validate a block we need the root of the block hash tree, which is also one of
  387.             # the leafs of the share hash tree, and is called "the share hash".
  388.             if not self.block_hash_tree[0]: # empty -- no root node yet
  389.                 # Get the share hash from the share hash tree.
  390.                 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
  391.                 if not share_hash:
  392.                     raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
  393.                 self.block_hash_tree.set_hashes({0: share_hash})
  394. 
  395.             if self.block_hash_tree.needed_hashes(blocknum):
  396.                 self.block_hash_tree.set_hashes(blockhashes)
  397. 
  398.             blockhash = hashutil.block_hash(blockdata)
  399.             self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
  400.             #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
  401.             #        "%r .. %r: %s" %
  402.             #        (self.sharenum, blocknum, len(blockdata),
  403.             #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
  404. 
  405.         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
  406.             # log.WEIRD: indicates undetected disk/network error, or more
  407.             # likely a programming error
  408.             self.log("hash failure in block=%d, shnum=%d on %s" %
  409.                     (blocknum, self.sharenum, self.bucket))
  410.             if self.block_hash_tree.needed_hashes(blocknum):
  411.                 self.log(""" failure occurred when checking the block_hash_tree.
  412.                 This suggests that either the block data was bad, or that the
  413.                 block hashes we received along with it were bad.""")
  414.             else:
  415.                 self.log(""" the failure probably occurred when checking the
  416.                 share_hash_tree, which suggests that the share hashes we
  417.                 received from the remote peer were bad.""")
  418.             self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
  419.             self.log(" block length: %d" % len(blockdata))
  420.             self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
  421.             if len(blockdata) < 100:
  422.                 self.log(" block data: %r" % (blockdata,))
  423.             else:
  424.                 self.log(" block data start/end: %r .. %r" %
  425.                         (blockdata[:50], blockdata[-50:]))
  426.             self.log(" share hash tree:\n" + self.share_hash_tree.dump())
  427.             self.log(" block hash tree:\n" + self.block_hash_tree.dump())
  428.             lines = []
  429.             for i,h in sorted(sharehashes.items()):
  430.                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
  431.             self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
  432.             lines = []
  433.             for i,h in blockhashes.items():
  434.                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
  435.             log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
  436.             raise BadOrMissingHash(le)
  437. 
  438.         # If we made it here, the block is good. If the hash trees didn't
  439.         # like what they saw, they would have raised a BadHashError, causing
  440.         # our caller to see a Failure and thus ignore this block (as well as
  441.         # dropping this bucket).
  442.         return blockdata
  443. 
  444. 
  445. 
  446. class BlockDownloader(log.PrefixingLogMixin):
  447.     """I am responsible for downloading a single block (from a single bucket)
  448.     for a single segment.
  449. 
  450.     I am a child of the SegmentDownloader.
  451.     """
  452. 
  453.     def __init__(self, vbucket, blocknum, parent, results):
  454.         precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
  455.         prefix = "%s-%d" % (vbucket, blocknum)
  456.         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
  457.         self.vbucket = vbucket
  458.         self.blocknum = blocknum
  459.         self.parent = parent
  460.         self.results = results
  461. 
  462.     def start(self, segnum):
  463.         self.log("get_block(segnum=%d)" % segnum)
  464.         started = time.time()
  465.         d = self.vbucket.get_block(segnum)
  466.         d.addCallbacks(self._hold_block, self._got_block_error,
  467.                        callbackArgs=(started,))
  468.         return d
  469. 
  470.     def _hold_block(self, data, started):
  471.         if self.results:
  472.             elapsed = time.time() - started
  473.             peerid = self.vbucket.bucket.get_peerid()
  474.             if peerid not in self.results.timings["fetch_per_server"]:
  475.                 self.results.timings["fetch_per_server"][peerid] = []
  476.             self.results.timings["fetch_per_server"][peerid].append(elapsed)
  477.         self.log("got block")
  478.         self.parent.hold_block(self.blocknum, data)
  479. 
  480.     def _got_block_error(self, f):
  481.         failtype = f.trap(RemoteException, DeadReferenceError,
  482.                           IntegrityCheckReject,
  483.                           layout.LayoutInvalid, layout.ShareVersionIncompatible)
  484.         if f.check(RemoteException, DeadReferenceError):
  485.             level = log.UNUSUAL
  486.         else:
  487.             level = log.WEIRD
  488.         self.log("failure to get block", level=level, umid="5Z4uHQ")
  489.         if self.results:
  490.             peerid = self.vbucket.bucket.get_peerid()
  491.             self.results.server_problems[peerid] = str(f)
  492.         self.parent.bucket_failed(self.vbucket)
  493. 
  494. class SegmentDownloader:
  495.     """I am responsible for downloading all the blocks for a single segment
  496.     of data.
  497. 
  498.     I am a child of the CiphertextDownloader.
  499.     """
  500. 
  501.     def __init__(self, parent, segmentnumber, needed_shares, results):
  502.         self.parent = parent
  503.         self.segmentnumber = segmentnumber
  504.         self.needed_blocks = needed_shares
  505.         self.blocks = {} # k: blocknum, v: data
  506.         self.results = results
  507.         self._log_number = self.parent.log("starting segment %d" %
  508.                                            segmentnumber)
  509. 
  510.     def log(self, *args, **kwargs):
  511.         if "parent" not in kwargs:
  512.             kwargs["parent"] = self._log_number
  513.         return self.parent.log(*args, **kwargs)
  514. 
  515.     def start(self):
  516.         return self._download()
  517. 
  518.     def _download(self):
  519.         d = self._try()
  520.         def _done(res):
  521.             if len(self.blocks) >= self.needed_blocks:
  522.                 # we only need self.needed_blocks blocks
  523.                 # we want to get the smallest blockids, because they are
  524.                 # more likely to be fast "primary blocks"
  525.                 blockids = sorted(self.blocks.keys())[:self.needed_blocks]
  526.                 blocks = []
  527.                 for blocknum in blockids:
  528.                     blocks.append(self.blocks[blocknum])
  529.                 return (blocks, blockids)
  530.             else:
  531.                 return self._download()
  532.         d.addCallback(_done)
  533.         return d
  534. 
  535.     def _try(self):
  536.         # fill our set of active buckets, maybe raising NotEnoughSharesError
  537.         active_buckets = self.parent._activate_enough_buckets()
  538.         # Now we have enough buckets, in self.parent.active_buckets.
  539. 
  540.         # in test cases, bd.start might mutate active_buckets right away, so
  541.         # we need to put off calling start() until we've iterated all the way
  542.         # through it.
  543.         downloaders = []
  544.         for blocknum, vbucket in active_buckets.iteritems():
  545.             assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
  546.             bd = BlockDownloader(vbucket, blocknum, self, self.results)
  547.             downloaders.append(bd)
  548.             if self.results:
  549.                 self.results.servers_used.add(vbucket.bucket.get_peerid())
  550.         l = [bd.start(self.segmentnumber) for bd in downloaders]
  551.         return defer.DeferredList(l, fireOnOneErrback=True)
  552. 
  553.     def hold_block(self, blocknum, data):
  554.         self.blocks[blocknum] = data
  555. 
  556.     def bucket_failed(self, vbucket):
  557.         self.parent.bucket_failed(vbucket)
  558. 
  559. class DownloadStatus:
  560.     implements(IDownloadStatus)
  561.     statusid_counter = itertools.count(0)
  562. 
  563.     def __init__(self):
  564.         self.storage_index = None
  565.         self.size = None
  566.         self.helper = False
  567.         self.status = "Not started"
  568.         self.progress = 0.0
  569.         self.paused = False
  570.         self.stopped = False
  571.         self.active = True
  572.         self.results = None
  573.         self.counter = self.statusid_counter.next()
  574.         self.started = time.time()
  575. 
  576.     def get_started(self):
  577.         return self.started
  578.     def get_storage_index(self):
  579.         return self.storage_index
  580.     def get_size(self):
  581.         return self.size
  582.     def using_helper(self):
  583.         return self.helper
  584.     def get_status(self):
  585.         status = self.status
  586.         if self.paused:
  587.             status += " (output paused)"
  588.         if self.stopped:
  589.             status += " (output stopped)"
  590.         return status
  591.     def get_progress(self):
  592.         return self.progress
  593.     def get_active(self):
  594.         return self.active
  595.     def get_results(self):
  596.         return self.results
  597.     def get_counter(self):
  598.         return self.counter
  599. 
  600.     def set_storage_index(self, si):
  601.         self.storage_index = si
  602.     def set_size(self, size):
  603.         self.size = size
  604.     def set_helper(self, helper):
  605.         self.helper = helper
  606.     def set_status(self, status):
  607.         self.status = status
  608.     def set_paused(self, paused):
  609.         self.paused = paused
  610.     def set_stopped(self, stopped):
  611.         self.stopped = stopped
  612.     def set_progress(self, value):
  613.         self.progress = value
  614.     def set_active(self, value):
  615.         self.active = value
  616.     def set_results(self, value):
  617.         self.results = value
  618. 
  619. class CiphertextDownloader(log.PrefixingLogMixin):
  620.     """ I download shares, check their integrity, then decode them, check the
  621.     integrity of the resulting ciphertext, then and write it to my target.
  622.     Before I send any new request to a server, I always ask the 'monitor'
  623.     object that was passed into my constructor whether this task has been
  624.     cancelled (by invoking its raise_if_cancelled() method)."""
  625.     implements(IPushProducer)
  626.     _status = None
  627. 
  628.     def __init__(self, storage_broker, v, target, monitor):
  629. 
  630.         precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
  631.         precondition(IVerifierURI.providedBy(v), v)
  632.         precondition(IDownloadTarget.providedBy(target), target)
  633. 
  634.         prefix=base32.b2a_l(v.storage_index[:8], 60)
  635.         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
  636.         self._storage_broker = storage_broker
  637. 
  638.         self._verifycap = v
  639.         self._storage_index = v.storage_index
  640.         self._uri_extension_hash = v.uri_extension_hash
  641. 
  642.         self._started = time.time()
  643.         self._status = s = DownloadStatus()
  644.         s.set_status("Starting")
  645.         s.set_storage_index(self._storage_index)
  646.         s.set_size(self._verifycap.size)
  647.         s.set_helper(False)
  648.         s.set_active(True)
  649. 
  650.         self._results = DownloadResults()
  651.         s.set_results(self._results)
  652.         self._results.file_size = self._verifycap.size
  653.         self._results.timings["servers_peer_selection"] = {}
  654.         self._results.timings["fetch_per_server"] = {}
  655.         self._results.timings["cumulative_fetch"] = 0.0
  656.         self._results.timings["cumulative_decode"] = 0.0
  657.         self._results.timings["cumulative_decrypt"] = 0.0
  658.         self._results.timings["paused"] = 0.0
  659. 
  660.         self._paused = False
  661.         self._stopped = False
  662.         if IConsumer.providedBy(target):
  663.             target.registerProducer(self, True)
  664.         self._target = target
  665.         self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
  666.         self._monitor = monitor
  667.         self._opened = False
  668. 
  669.         self.active_buckets = {} # k: shnum, v: bucket
  670.         self._share_buckets = [] # list of (sharenum, bucket) tuples
  671.         self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
  672. 
  673.         self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
  674. 
  675.         self._ciphertext_hasher = hashutil.crypttext_hasher()
  676. 
  677.         self._bytes_done = 0
  678.         self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
  679. 
  680.         # _got_uri_extension() will create the following:
  681.         # self._crypttext_hash_tree
  682.         # self._share_hash_tree
  683.         # self._current_segnum = 0
  684.         # self._vup # ValidatedExtendedURIProxy
  685. 
  686.     def pauseProducing(self):
  687.         if self._paused:
  688.             return
  689.         self._paused = defer.Deferred()
  690.         self._paused_at = time.time()
  691.         if self._status:
  692.             self._status.set_paused(True)
  693. 
  694.     def resumeProducing(self):
  695.         if self._paused:
  696.             paused_for = time.time() - self._paused_at
  697.             self._results.timings['paused'] += paused_for
  698.             p = self._paused
  699.             self._paused = None
  700.             eventually(p.callback, None)
  701.             if self._status:
  702.                 self._status.set_paused(False)
  703. 
  704.     def stopProducing(self):
  705.         self.log("Download.stopProducing")
  706.         self._stopped = True
  707.         self.resumeProducing()
  708.         if self._status:
  709.             self._status.set_stopped(True)
  710.             self._status.set_active(False)
  711. 
  712.     def start(self):
  713.         self.log("starting download")
  714. 
  715.         # first step: who should we download from?
  716.         d = defer.maybeDeferred(self._get_all_shareholders)
  717.         d.addCallback(self._got_all_shareholders)
  718.         # now get the uri_extension block from somebody and integrity check it and parse and validate its contents
  719.         d.addCallback(self._obtain_uri_extension)
  720.         d.addCallback(self._get_crypttext_hash_tree)
  721.         # once we know that, we can download blocks from everybody
  722.         d.addCallback(self._download_all_segments)
  723.         def _finished(res):
  724.             if self._status:
  725.                 self._status.set_status("Finished")
  726.                 self._status.set_active(False)
  727.                 self._status.set_paused(False)
  728.             if IConsumer.providedBy(self._target):
  729.                 self._target.unregisterProducer()
  730.             return res
  731.         d.addBoth(_finished)
  732.         def _failed(why):
  733.             if self._status:
  734.                 self._status.set_status("Failed")
  735.                 self._status.set_active(False)
  736.             if why.check(DownloadStopped):
  737.                 # DownloadStopped just means the consumer aborted the download; not so scary.
  738.                 self.log("download stopped", level=log.UNUSUAL)
  739.             else:
  740.                 # This is really unusual, and deserves maximum forensics.
  741.                 self.log("download failed!", failure=why, level=log.SCARY, umid="lp1vaQ")
  742.             return why
  743.         d.addErrback(_failed)
  744.         d.addCallback(self._done)
  745.         return d
  746. 
  747.     def _get_all_shareholders(self):
  748.         dl = []
  749.         sb = self._storage_broker
  750.         servers = sb.get_servers_for_index(self._storage_index)
  751.         if not servers:
  752.             raise NoServersError("broker gave us no servers!")
  753.         for (peerid,ss) in servers:
  754.             self.log(format="sending DYHB to [%(peerid)s]",
  755.                      peerid=idlib.shortnodeid_b2a(peerid),
  756.                      level=log.NOISY, umid="rT03hg")
  757.             d = ss.callRemote("get_buckets", self._storage_index)
  758.             d.addCallbacks(self._got_response, self._got_error,
  759.                            callbackArgs=(peerid,))
  760.             dl.append(d)
  761.         self._responses_received = 0
  762.         self._queries_sent = len(dl)
  763.         if self._status:
  764.             self._status.set_status("Locating Shares (%d/%d)" %
  765.                                     (self._responses_received,
  766.                                      self._queries_sent))
  767.         return defer.DeferredList(dl)
  768. 
  769.     def _got_response(self, buckets, peerid):
  770.         self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
  771.                  peerid=idlib.shortnodeid_b2a(peerid),
  772.                  shnums=sorted(buckets.keys()),
  773.                  level=log.NOISY, umid="o4uwFg")
  774.         self._responses_received += 1
  775.         if self._results:
  776.             elapsed = time.time() - self._started
  777.             self._results.timings["servers_peer_selection"][peerid] = elapsed
  778.         if self._status:
  779.             self._status.set_status("Locating Shares (%d/%d)" %
  780.                                     (self._responses_received,
  781.                                      self._queries_sent))
  782.         for sharenum, bucket in buckets.iteritems():
  783.             b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
  784.             self.add_share_bucket(sharenum, b)
  785. 
  786.             if self._results:
  787.                 if peerid not in self._results.servermap:
  788.                     self._results.servermap[peerid] = set()
  789.                 self._results.servermap[peerid].add(sharenum)
  790. 
  791.     def add_share_bucket(self, sharenum, bucket):
  792.         # this is split out for the benefit of test_encode.py
  793.         self._share_buckets.append( (sharenum, bucket) )
  794. 
  795.     def _got_error(self, f):
  796.         level = log.WEIRD
  797.         if f.check(DeadReferenceError):
  798.             level = log.UNUSUAL
  799.         self.log("Error during get_buckets", failure=f, level=level,
  800.                          umid="3uuBUQ")
  801. 
  802.     def bucket_failed(self, vbucket):
  803.         shnum = vbucket.sharenum
  804.         del self.active_buckets[shnum]
  805.         s = self._share_vbuckets[shnum]
  806.         # s is a set of ValidatedReadBucketProxy instances
  807.         s.remove(vbucket)
  808.         # ... which might now be empty
  809.         if not s:
  810.             # there are no more buckets which can provide this share, so
  811.             # remove the key. This may prompt us to use a different share.
  812.             del self._share_vbuckets[shnum]
  813. 
  814.     def _got_all_shareholders(self, res):
  815.         if self._results:
  816.             now = time.time()
  817.             self._results.timings["peer_selection"] = now - self._started
  818. 
  819.         if len(self._share_buckets) < self._verifycap.needed_shares:
  820.             msg = "Failed to get enough shareholders: have %d, need %d" \
  821.                   % (len(self._share_buckets), self._verifycap.needed_shares)
  822.             if self._share_buckets:
  823.                 raise NotEnoughSharesError(msg)
  824.             else:
  825.                 raise NoSharesError(msg)
  826. 
  827.         #for s in self._share_vbuckets.values():
  828.         #    for vb in s:
  829.         #        assert isinstance(vb, ValidatedReadBucketProxy), \
  830.         #               "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
  831. 
  832.     def _obtain_uri_extension(self, ignored):
  833.         # all shareholders are supposed to have a copy of uri_extension, and
  834.         # all are supposed to be identical. We compute the hash of the data
  835.         # that comes back, and compare it against the version in our URI. If
  836.         # they don't match, ignore their data and try someone else.
  837.         if self._status:
  838.             self._status.set_status("Obtaining URI Extension")
  839. 
  840.         uri_extension_fetch_started = time.time()
  841. 
  842.         vups = []
  843.         for sharenum, bucket in self._share_buckets:
  844.             vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
  845.         vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
  846.         d = vto.start()
  847. 
  848.         def _got_uri_extension(vup):
  849.             precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
  850.             if self._results:
  851.                 elapsed = time.time() - uri_extension_fetch_started
  852.                 self._results.timings["uri_extension"] = elapsed
  853. 
  854.             self._vup = vup
  855.             self._codec = codec.CRSDecoder()
  856.             self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
  857.             self._tail_codec = codec.CRSDecoder()
  858.             self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
  859. 
  860.             self._current_segnum = 0
  861. 
  862.             self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
  863.             self._share_hash_tree.set_hashes({0: vup.share_root_hash})
  864. 
  865.             self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
  866.             self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
  867. 
  868.             # Repairer (uploader) needs the encodingparams.
  869.             self._target.set_encodingparams((
  870.                 self._verifycap.needed_shares,
  871.                 self._verifycap.total_shares, # I don't think the target actually cares about "happy".
  872.                 self._verifycap.total_shares,
  873.                 self._vup.segment_size
  874.                 ))
  875.         d.addCallback(_got_uri_extension)
  876.         return d
  877. 
  878.     def _get_crypttext_hash_tree(self, res):
  879.         vchtps = []
  880.         for sharenum, bucket in self._share_buckets:
  881.             vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
  882.             vchtps.append(vchtp)
  883. 
  884.         _get_crypttext_hash_tree_started = time.time()
  885.         if self._status:
  886.             self._status.set_status("Retrieving crypttext hash tree")
  887. 
  888.         vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
  889.         d = vto.start()
  890. 
  891.         def _got_crypttext_hash_tree(res):
  892.             # Good -- the self._crypttext_hash_tree that we passed to vchtp is now populated
  893.             # with hashes.
  894.             if self._results:
  895.                 elapsed = time.time() - _get_crypttext_hash_tree_started
  896.                 self._results.timings["hashtrees"] = elapsed
  897.         d.addCallback(_got_crypttext_hash_tree)
  898.         return d
  899. 
  900.     def _activate_enough_buckets(self):
  901.         """either return a mapping from shnum to a ValidatedReadBucketProxy that can
  902.         provide data for that share, or raise NotEnoughSharesError"""
  903. 
  904.         while len(self.active_buckets) < self._verifycap.needed_shares:
  905.             # need some more
  906.             handled_shnums = set(self.active_buckets.keys())
  907.             available_shnums = set(self._share_vbuckets.keys())
  908.             potential_shnums = list(available_shnums - handled_shnums)
  909.             if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
  910.                 have = len(potential_shnums) + len(self.active_buckets)
  911.                 msg = "Unable to activate enough shares: have %d, need %d" \
  912.                       % (have, self._verifycap.needed_shares)
  913.                 if have:
  914.                     raise NotEnoughSharesError(msg)
  915.                 else:
  916.                     raise NoSharesError(msg)
  917.             # For the next share, choose a primary share if available, else a randomly chosen
  918.             # secondary share.
  919.             potential_shnums.sort()
  920.             if potential_shnums[0] < self._verifycap.needed_shares:
  921.                 shnum = potential_shnums[0]
  922.             else:
  923.                 shnum = random.choice(potential_shnums)
  924.             # and a random bucket that will provide it
  925.             validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
  926.             self.active_buckets[shnum] = validated_bucket
  927.         return self.active_buckets
  928. 
  929. 
  930.     def _download_all_segments(self, res):
  931.         for sharenum, bucket in self._share_buckets:
  932.             vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
  933.             self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
  934. 
  935.         # after the above code, self._share_vbuckets contains enough
  936.         # buckets to complete the download, and some extra ones to
  937.         # tolerate some buckets dropping out or having
  938.         # errors. self._share_vbuckets is a dictionary that maps from
  939.         # shnum to a set of ValidatedBuckets, which themselves are
  940.         # wrappers around RIBucketReader references.
  941.         self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
  942. 
  943.         self._started_fetching = time.time()
  944. 
  945.         d = defer.succeed(None)
  946.         for segnum in range(self._vup.num_segments):
  947.             d.addCallback(self._download_segment, segnum)
  948.             # this pause, at the end of write, prevents pre-fetch from
  949.             # happening until the consumer is ready for more data.
  950.             d.addCallback(self._check_for_pause)
  951.         return d
  952. 
  953.     def _check_for_pause(self, res):
  954.         if self._paused:
  955.             d = defer.Deferred()
  956.             self._paused.addCallback(lambda ignored: d.callback(res))
  957.             return d
  958.         if self._stopped:
  959.             raise DownloadStopped("our Consumer called stopProducing()")
  960.         self._monitor.raise_if_cancelled()
  961.         return res
  962. 
  963.     def _download_segment(self, res, segnum):
  964.         if self._status:
  965.             self._status.set_status("Downloading segment %d of %d" %
  966.                                     (segnum+1, self._vup.num_segments))
  967.         self.log("downloading seg#%d of %d (%d%%)"
  968.                  % (segnum, self._vup.num_segments,
  969.                     100.0 * segnum / self._vup.num_segments))
  970.         # memory footprint: when the SegmentDownloader finishes pulling down
  971.         # all shares, we have 1*segment_size of usage.
  972.         segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
  973.                                         self._results)
  974.         started = time.time()
  975.         d = segmentdler.start()
  976.         def _finished_fetching(res):
  977.             elapsed = time.time() - started
  978.             self._results.timings["cumulative_fetch"] += elapsed
  979.             return res
  980.         if self._results:
  981.             d.addCallback(_finished_fetching)
  982.         # pause before using more memory
  983.         d.addCallback(self._check_for_pause)
  984.         # while the codec does its job, we hit 2*segment_size
  985.         def _started_decode(res):
  986.             self._started_decode = time.time()
  987.             return res
  988.         if self._results:
  989.             d.addCallback(_started_decode)
  990.         if segnum + 1 == self._vup.num_segments:
  991.             codec = self._tail_codec
  992.         else:
  993.             codec = self._codec
  994.         d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
  995.         # once the codec is done, we drop back to 1*segment_size, because
  996.         # 'shares' goes out of scope. The memory usage is all in the
  997.         # plaintext now, spread out into a bunch of tiny buffers.
  998.         def _finished_decode(res):
  999.             elapsed = time.time() - self._started_decode
 1000.             self._results.timings["cumulative_decode"] += elapsed
 1001.             return res
 1002.         if self._results:
 1003.             d.addCallback(_finished_decode)
 1004. 
 1005.         # pause/check-for-stop just before writing, to honor stopProducing
 1006.         d.addCallback(self._check_for_pause)
 1007.         d.addCallback(self._got_segment)
 1008.         return d
 1009. 
 1010.     def _got_segment(self, buffers):
 1011.         precondition(self._crypttext_hash_tree)
 1012.         started_decrypt = time.time()
 1013.         self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
 1014. 
 1015.         if self._current_segnum + 1 == self._vup.num_segments:
 1016.             # This is the last segment.
 1017.             # Trim off any padding added by the upload side.  We never send empty segments. If
 1018.             # the data was an exact multiple of the segment size, the last segment will be full.
 1019.             tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
 1020.             num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
 1021.             # Remove buffers which don't contain any part of the tail.
 1022.             del buffers[num_buffers_used:]
 1023.             # Remove the past-the-tail-part of the last buffer.
 1024.             tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
 1025.             if tail_in_last_buf == 0:
 1026.                 tail_in_last_buf = tail_buf_size
 1027.             buffers[-1] = buffers[-1][:tail_in_last_buf]
 1028. 
 1029.         # First compute the hash of this segment and check that it fits.
 1030.         ch = hashutil.crypttext_segment_hasher()
 1031.         for buffer in buffers:
 1032.             self._ciphertext_hasher.update(buffer)
 1033.             ch.update(buffer)
 1034.         self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
 1035. 
 1036.         # Then write this segment to the target.
 1037.         if not self._opened:
 1038.             self._opened = True
 1039.             self._target.open(self._verifycap.size)
 1040. 
 1041.         for buffer in buffers:
 1042.             self._target.write(buffer)
 1043.             self._bytes_done += len(buffer)
 1044. 
 1045.         self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
 1046.         self._current_segnum += 1
 1047. 
 1048.         if self._results:
 1049.             elapsed = time.time() - started_decrypt
 1050.             self._results.timings["cumulative_decrypt"] += elapsed
 1051. 
 1052.     def _done(self, res):
 1053.         self.log("download done")
 1054.         if self._results:
 1055.             now = time.time()
 1056.             self._results.timings["total"] = now - self._started
 1057.             self._results.timings["segments"] = now - self._started_fetching
 1058.         if self._vup.crypttext_hash:
 1059.             _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
 1060.                     "bad crypttext_hash: computed=%s, expected=%s" %
 1061.                     (base32.b2a(self._ciphertext_hasher.digest()),
 1062.                      base32.b2a(self._vup.crypttext_hash)))
 1063.         _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
 1064.         self._status.set_progress(1)
 1065.         self._target.close()
 1066.         return self._target.finish()
 1067.     def get_download_status(self):
 1068.         return self._status
 1069. 
 1070. 
 1071. class FileName:
 1072.     implements(IDownloadTarget)
 1073.     def __init__(self, filename):
 1074.         self._filename = filename
 1075.         self.f = None
 1076.     def open(self, size):
 1077.         self.f = open(self._filename, "wb")
 1078.         return self.f
 1079.     def write(self, data):
 1080.         self.f.write(data)
 1081.     def close(self):
 1082.         if self.f:
 1083.             self.f.close()
 1084.     def fail(self, why):
 1085.         if self.f:
 1086.             self.f.close()
 1087.             os.unlink(self._filename)
 1088.     def register_canceller(self, cb):
 1089.         pass # we won't use it
 1090.     def finish(self):
 1091.         pass
 1092.     # The following methods are just because the target might be a repairer.DownUpConnector,
 1093.     # and just because the current CHKUpload object expects to find the storage index and
 1094.     # encoding parameters in its Uploadable.
 1095.     def set_storageindex(self, storageindex):
 1096.         pass
 1097.     def set_encodingparams(self, encodingparams):
 1098.         pass
 1099. 
 1100. class Data:
 1101.     implements(IDownloadTarget)
 1102.     def __init__(self):
 1103.         self._data = []
 1104.     def open(self, size):
 1105.         pass
 1106.     def write(self, data):
 1107.         self._data.append(data)
 1108.     def close(self):
 1109.         self.data = "".join(self._data)
 1110.         del self._data
 1111.     def fail(self, why):
 1112.         del self._data
 1113.     def register_canceller(self, cb):
 1114.         pass # we won't use it
 1115.     def finish(self):
 1116.         return self.data
 1117.     # The following methods are just because the target might be a repairer.DownUpConnector,
 1118.     # and just because the current CHKUpload object expects to find the storage index and
 1119.     # encoding parameters in its Uploadable.
 1120.     def set_storageindex(self, storageindex):
 1121.         pass
 1122.     def set_encodingparams(self, encodingparams):
 1123.         pass
 1124. 
 1125. class FileHandle:
 1126.     """Use me to download data to a pre-defined filehandle-like object. I
 1127.     will use the target's write() method. I will *not* close the filehandle:
 1128.     I leave that up to the originator of the filehandle. The download process
 1129.     will return the filehandle when it completes.
 1130.     """
 1131.     implements(IDownloadTarget)
 1132.     def __init__(self, filehandle):
 1133.         self._filehandle = filehandle
 1134.     def open(self, size):
 1135.         pass
 1136.     def write(self, data):
 1137.         self._filehandle.write(data)
 1138.     def close(self):
 1139.         # the originator of the filehandle reserves the right to close it
 1140.         pass
 1141.     def fail(self, why):
 1142.         pass
 1143.     def register_canceller(self, cb):
 1144.         pass
 1145.     def finish(self):
 1146.         return self._filehandle
 1147.     # The following methods are just because the target might be a repairer.DownUpConnector,
 1148.     # and just because the current CHKUpload object expects to find the storage index and
 1149.     # encoding parameters in its Uploadable.
 1150.     def set_storageindex(self, storageindex):
 1151.         pass
 1152.     def set_encodingparams(self, encodingparams):
 1153.         pass
 1154. 
 1155. class ConsumerAdapter:
 1156.     implements(IDownloadTarget, IConsumer)
 1157.     def __init__(self, consumer):
 1158.         self._consumer = consumer
 1159. 
 1160.     def registerProducer(self, producer, streaming):
 1161.         self._consumer.registerProducer(producer, streaming)
 1162.     def unregisterProducer(self):
 1163.         self._consumer.unregisterProducer()
 1164. 
 1165.     def open(self, size):
 1166.         pass
 1167.     def write(self, data):
 1168.         self._consumer.write(data)
 1169.     def close(self):
 1170.         pass
 1171. 
 1172.     def fail(self, why):
 1173.         pass
 1174.     def register_canceller(self, cb):
 1175.         pass
 1176.     def finish(self):
 1177.         return self._consumer
 1178.     # The following methods are just because the target might be a repairer.DownUpConnector,
 1179.     # and just because the current CHKUpload object expects to find the storage index and
 1180.     # encoding parameters in its Uploadable.
 1181.     def set_storageindex(self, storageindex):
 1182.         pass
 1183.     def set_encodingparams(self, encodingparams):
 1184.         pass
 1185. 
 1186. 
 1187. class Downloader:
 1188.     """I am a service that allows file downloading.
 1189.     """
 1190.     # TODO: in fact, this service only downloads immutable files (URI:CHK:).
 1191.     # It is scheduled to go away, to be replaced by filenode.download()
 1192.     implements(IDownloader)
 1193. 
 1194.     def __init__(self, storage_broker, stats_provider):
 1195.         self.storage_broker = storage_broker
 1196.         self.stats_provider = stats_provider
 1197.         self._all_downloads = weakref.WeakKeyDictionary() # for debugging
 1198. 
 1199.     def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
 1200.         u = IFileURI(u)
 1201.         t = IDownloadTarget(t)
 1202.         assert t.write
 1203.         assert t.close
 1204. 
 1205.         assert isinstance(u, uri.CHKFileURI)
 1206.         if self.stats_provider:
 1207.             # these counters are meant for network traffic, and don't
 1208.             # include LIT files
 1209.             self.stats_provider.count('downloader.files_downloaded', 1)
 1210.             self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
 1211. 
 1212.         target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
 1213.         if not monitor:
 1214.             monitor=Monitor()
 1215.         dl = CiphertextDownloader(self.storage_broker,
 1216.                                   u.get_verify_cap(), target,
 1217.                                   monitor=monitor)
 1218.         self._all_downloads[dl] = None
 1219.         if history:
 1220.             history.add_download(dl.get_download_status())
 1221.         d = dl.start()
 1222.         return d
 1223. 
 1224.     # utility functions
 1225.     def download_to_data(self, uri, _log_msg_id=None, history=None):
 1226.         return self.download(uri, Data(), _log_msg_id=_log_msg_id, history=history)
 1227.     def download_to_filename(self, uri, filename, _log_msg_id=None):
 1228.         return self.download(uri, FileName(filename), _log_msg_id=_log_msg_id)
 1229.     def download_to_filehandle(self, uri, filehandle, _log_msg_id=None):
 1230.         return self.download(uri, FileHandle(filehandle), _log_msg_id=_log_msg_id)