source file: /home/buildslave/tahoe/edgy/build/src/allmydata/immutable/download.py
file stats: 841 lines, 811 executed: 96.4% covered
coverage versus previous test: 0 lines added, 0 lines removed
1. import os, random, weakref, itertools, time
2. from zope.interface import implements
3. from twisted.internet import defer
4. from twisted.internet.interfaces import IPushProducer, IConsumer
5. from foolscap.api import DeadReferenceError, RemoteException, eventually
6.
7. from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
8. from allmydata.util.assertutil import _assert, precondition
9. from allmydata import codec, hashtree, uri
10. from allmydata.interfaces import IDownloadTarget, IDownloader, \
11. IFileURI, IVerifierURI, \
12. IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
13. IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \
14. UnableToFetchCriticalDownloadDataError
15. from allmydata.immutable import layout
16. from allmydata.monitor import Monitor
17. from pycryptopp.cipher.aes import AES
18.
19. class IntegrityCheckReject(Exception):
20. pass
21.
22. class BadURIExtensionHashValue(IntegrityCheckReject):
23. pass
24. class BadURIExtension(IntegrityCheckReject):
25. pass
26. class UnsupportedErasureCodec(BadURIExtension):
27. pass
28. class BadCrypttextHashValue(IntegrityCheckReject):
29. pass
30. class BadOrMissingHash(IntegrityCheckReject):
31. pass
32.
33. class DownloadStopped(Exception):
34. pass
35.
36. class DownloadResults:
37. implements(IDownloadResults)
38.
39. def __init__(self):
40. self.servers_used = set()
41. self.server_problems = {}
42. self.servermap = {}
43. self.timings = {}
44. self.file_size = None
45.
46. class DecryptingTarget(log.PrefixingLogMixin):
47. implements(IDownloadTarget, IConsumer)
48. def __init__(self, target, key, _log_msg_id=None):
49. precondition(IDownloadTarget.providedBy(target), target)
50. self.target = target
51. self._decryptor = AES(key)
52. prefix = str(target)
53. log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
54. # methods to satisfy the IConsumer interface
55. def registerProducer(self, producer, streaming):
56. if IConsumer.providedBy(self.target):
57. self.target.registerProducer(producer, streaming)
58. def unregisterProducer(self):
59. if IConsumer.providedBy(self.target):
60. self.target.unregisterProducer()
61. def write(self, ciphertext):
62. plaintext = self._decryptor.process(ciphertext)
63. self.target.write(plaintext)
64. def open(self, size):
65. self.target.open(size)
66. def close(self):
67. self.target.close()
68. def finish(self):
69. return self.target.finish()
70. # The following methods is just to pass through to the next target, and just because that
71. # target might be a repairer.DownUpConnector, and just because the current CHKUpload object
72. # expects to find the storage index in its Uploadable.
73. def set_storageindex(self, storageindex):
74. self.target.set_storageindex(storageindex)
75. def set_encodingparams(self, encodingparams):
76. self.target.set_encodingparams(encodingparams)
77.
78. class ValidatedThingObtainer:
79. def __init__(self, validatedthingproxies, debugname, log_id):
80. self._validatedthingproxies = validatedthingproxies
81. self._debugname = debugname
82. self._log_id = log_id
83.
84. def _bad(self, f, validatedthingproxy):
85. failtype = f.trap(RemoteException, DeadReferenceError,
86. IntegrityCheckReject, layout.LayoutInvalid,
87. layout.ShareVersionIncompatible)
88. level = log.WEIRD
89. if f.check(DeadReferenceError):
90. level = log.UNUSUAL
91. elif f.check(RemoteException):
92. level = log.WEIRD
93. else:
94. level = log.SCARY
95. log.msg(parent=self._log_id, facility="tahoe.immutable.download",
96. format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
97. op=self._debugname, validatedthingproxy=str(validatedthingproxy),
98. failure=f, level=level, umid="JGXxBA")
99. if not self._validatedthingproxies:
100. raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
101. # try again with a different one
102. d = self._try_the_next_one()
103. return d
104.
105. def _try_the_next_one(self):
106. vtp = self._validatedthingproxies.pop(0)
107. d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
108. d.addErrback(self._bad, vtp)
109. return d
110.
111. def start(self):
112. return self._try_the_next_one()
113.
114. class ValidatedCrypttextHashTreeProxy:
115. implements(IValidatedThingProxy)
116. """ I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
117. its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
118. start() method that fires with self once I get a valid one). """
119. def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, fetch_failures=None):
120. # fetch_failures is for debugging -- see test_encode.py
121. self._readbucketproxy = readbucketproxy
122. self._num_segments = num_segments
123. self._fetch_failures = fetch_failures
124. self._crypttext_hash_tree = crypttext_hash_tree
125.
126. def _validate(self, proposal):
127. ct_hashes = dict(list(enumerate(proposal)))
128. try:
129. self._crypttext_hash_tree.set_hashes(ct_hashes)
130. except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
131. if self._fetch_failures is not None:
132. self._fetch_failures["crypttext_hash_tree"] += 1
133. raise BadOrMissingHash(le)
134. # If we now have enough of the crypttext hash tree to integrity-check *any* segment of ciphertext, then we are done.
135. # TODO: It would have better alacrity if we downloaded only part of the crypttext hash tree at a time.
136. for segnum in range(self._num_segments):
137. if self._crypttext_hash_tree.needed_hashes(segnum):
138. raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
139. return self
140.
141. def start(self):
142. d = self._readbucketproxy.get_crypttext_hashes()
143. d.addCallback(self._validate)
144. return d
145.
146. class ValidatedExtendedURIProxy:
147. implements(IValidatedThingProxy)
148. """ I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
149. retrieving and validating the elements from the UEB. """
150.
151. def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
152. # fetch_failures is for debugging -- see test_encode.py
153. self._fetch_failures = fetch_failures
154. self._readbucketproxy = readbucketproxy
155. precondition(IVerifierURI.providedBy(verifycap), verifycap)
156. self._verifycap = verifycap
157.
158. # required
159. self.segment_size = None
160. self.crypttext_root_hash = None
161. self.share_root_hash = None
162.
163. # computed
164. self.block_size = None
165. self.share_size = None
166. self.num_segments = None
167. self.tail_data_size = None
168. self.tail_segment_size = None
169.
170. # optional
171. self.crypttext_hash = None
172.
173. def __str__(self):
174. return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
175.
176. def _check_integrity(self, data):
177. h = hashutil.uri_extension_hash(data)
178. if h != self._verifycap.uri_extension_hash:
179. msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
180. (self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
181. if self._fetch_failures is not None:
182. self._fetch_failures["uri_extension"] += 1
183. raise BadURIExtensionHashValue(msg)
184. else:
185. return data
186.
187. def _parse_and_validate(self, data):
188. self.share_size = mathutil.div_ceil(self._verifycap.size, self._verifycap.needed_shares)
189.
190. d = uri.unpack_extension(data)
191.
192. # There are several kinds of things that can be found in a UEB. First, things that we
193. # really need to learn from the UEB in order to do this download. Next: things which are
194. # optional but not redundant -- if they are present in the UEB they will get used. Next,
195. # things that are optional and redundant. These things are required to be consistent:
196. # they don't have to be in the UEB, but if they are in the UEB then they will be checked
197. # for consistency with the already-known facts, and if they are inconsistent then an
198. # exception will be raised. These things aren't actually used -- they are just tested
199. # for consistency and ignored. Finally: things which are deprecated -- they ought not be
200. # in the UEB at all, and if they are present then a warning will be logged but they are
201. # otherwise ignored.
202.
203. # First, things that we really need to learn from the UEB: segment_size,
204. # crypttext_root_hash, and share_root_hash.
205. self.segment_size = d['segment_size']
206.
207. self.block_size = mathutil.div_ceil(self.segment_size, self._verifycap.needed_shares)
208. self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
209.
210. self.tail_data_size = self._verifycap.size % self.segment_size
211. if not self.tail_data_size:
212. self.tail_data_size = self.segment_size
213. # padding for erasure code
214. self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, self._verifycap.needed_shares)
215.
216. # Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
217. # matches this read-cap or verify-cap. The integrity check on the shares is not
218. # sufficient to prevent the original encoder from creating some shares of file A and
219. # other shares of file B.
220. self.crypttext_root_hash = d['crypttext_root_hash']
221.
222. self.share_root_hash = d['share_root_hash']
223.
224.
225. # Next: things that are optional and not redundant: crypttext_hash
226. if d.has_key('crypttext_hash'):
227. self.crypttext_hash = d['crypttext_hash']
228. if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
229. raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
230.
231.
232. # Next: things that are optional, redundant, and required to be consistent: codec_name,
233. # codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
234. if d.has_key('codec_name'):
235. if d['codec_name'] != "crs":
236. raise UnsupportedErasureCodec(d['codec_name'])
237.
238. if d.has_key('codec_params'):
239. ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
240. if ucpss != self.segment_size:
241. raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
242. "self.segment_size: %s" % (ucpss, self.segment_size))
243. if ucpns != self._verifycap.needed_shares:
244. raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
245. "self._verifycap.needed_shares: %s" % (ucpns,
246. self._verifycap.needed_shares))
247. if ucpts != self._verifycap.total_shares:
248. raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
249. "self._verifycap.total_shares: %s" % (ucpts,
250. self._verifycap.total_shares))
251.
252. if d.has_key('tail_codec_params'):
253. utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
254. if utcpss != self.tail_segment_size:
255. raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
256. "self.tail_segment_size: %s, self._verifycap.size: %s, "
257. "self.segment_size: %s, self._verifycap.needed_shares: %s"
258. % (utcpss, self.tail_segment_size, self._verifycap.size,
259. self.segment_size, self._verifycap.needed_shares))
260. if utcpns != self._verifycap.needed_shares:
261. raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
262. "self._verifycap.needed_shares: %s" % (utcpns,
263. self._verifycap.needed_shares))
264. if utcpts != self._verifycap.total_shares:
265. raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
266. "self._verifycap.total_shares: %s" % (utcpts,
267. self._verifycap.total_shares))
268.
269. if d.has_key('num_segments'):
270. if d['num_segments'] != self.num_segments:
271. raise BadURIExtension("inconsistent num_segments: size: %s, "
272. "segment_size: %s, computed_num_segments: %s, "
273. "ueb_num_segments: %s" % (self._verifycap.size,
274. self.segment_size,
275. self.num_segments, d['num_segments']))
276.
277. if d.has_key('size'):
278. if d['size'] != self._verifycap.size:
279. raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
280. (self._verifycap.size, d['size']))
281.
282. if d.has_key('needed_shares'):
283. if d['needed_shares'] != self._verifycap.needed_shares:
284. raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
285. "needed shares: %s" % (self._verifycap.total_shares,
286. d['needed_shares']))
287.
288. if d.has_key('total_shares'):
289. if d['total_shares'] != self._verifycap.total_shares:
290. raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
291. "total shares: %s" % (self._verifycap.total_shares,
292. d['total_shares']))
293.
294. # Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
295. if d.get('plaintext_hash'):
296. log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
297. "and is no longer used. Ignoring. %s" % (self,))
298. if d.get('plaintext_root_hash'):
299. log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
300. "reasons and is no longer used. Ignoring. %s" % (self,))
301.
302. return self
303.
304. def start(self):
305. """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
306. it. Returns a deferred which is called back with self once the fetch is successful, or
307. is erred back if it fails. """
308. d = self._readbucketproxy.get_uri_extension()
309. d.addCallback(self._check_integrity)
310. d.addCallback(self._parse_and_validate)
311. return d
312.
313. class ValidatedReadBucketProxy(log.PrefixingLogMixin):
314. """I am a front-end for a remote storage bucket, responsible for retrieving and validating
315. data from that bucket.
316.
317. My get_block() method is used by BlockDownloaders.
318. """
319.
320. def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
321. """ share_hash_tree is required to have already been initialized with the root hash
322. (the number-0 hash), using the share_root_hash from the UEB """
323. precondition(share_hash_tree[0] is not None, share_hash_tree)
324. prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
325. log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
326. self.sharenum = sharenum
327. self.bucket = bucket
328. self.share_hash_tree = share_hash_tree
329. self.num_blocks = num_blocks
330. self.block_size = block_size
331. self.share_size = share_size
332. self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
333.
334. def get_block(self, blocknum):
335. # the first time we use this bucket, we need to fetch enough elements
336. # of the share hash tree to validate it from our share hash up to the
337. # hashroot.
338. if self.share_hash_tree.needed_hashes(self.sharenum):
339. d1 = self.bucket.get_share_hashes()
340. else:
341. d1 = defer.succeed([])
342.
343. # We might need to grab some elements of our block hash tree, to
344. # validate the requested block up to the share hash.
345. blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
346. # We don't need the root of the block hash tree, as that comes in the share tree.
347. blockhashesneeded.discard(0)
348. d2 = self.bucket.get_block_hashes(blockhashesneeded)
349.
350. if blocknum < self.num_blocks-1:
351. thisblocksize = self.block_size
352. else:
353. thisblocksize = self.share_size % self.block_size
354. if thisblocksize == 0:
355. thisblocksize = self.block_size
356. d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
357.
358. dl = deferredutil.gatherResults([d1, d2, d3])
359. dl.addCallback(self._got_data, blocknum)
360. return dl
361.
362. def _got_data(self, results, blocknum):
363. precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
364. sharehashes, blockhashes, blockdata = results
365. try:
366. sharehashes = dict(sharehashes)
367. except ValueError, le:
368. le.args = tuple(le.args + (sharehashes,))
369. raise
370. blockhashes = dict(enumerate(blockhashes))
371.
372. candidate_share_hash = None # in case we log it in the except block below
373. blockhash = None # in case we log it in the except block below
374.
375. try:
376. if self.share_hash_tree.needed_hashes(self.sharenum):
377. # This will raise exception if the values being passed do not match the root
378. # node of self.share_hash_tree.
379. try:
380. self.share_hash_tree.set_hashes(sharehashes)
381. except IndexError, le:
382. # Weird -- sharehashes contained index numbers outside of the range that fit
383. # into this hash tree.
384. raise BadOrMissingHash(le)
385.
386. # To validate a block we need the root of the block hash tree, which is also one of
387. # the leafs of the share hash tree, and is called "the share hash".
388. if not self.block_hash_tree[0]: # empty -- no root node yet
389. # Get the share hash from the share hash tree.
390. share_hash = self.share_hash_tree.get_leaf(self.sharenum)
391. if not share_hash:
392. raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
393. self.block_hash_tree.set_hashes({0: share_hash})
394.
395. if self.block_hash_tree.needed_hashes(blocknum):
396. self.block_hash_tree.set_hashes(blockhashes)
397.
398. blockhash = hashutil.block_hash(blockdata)
399. self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
400. #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
401. # "%r .. %r: %s" %
402. # (self.sharenum, blocknum, len(blockdata),
403. # blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
404.
405. except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
406. # log.WEIRD: indicates undetected disk/network error, or more
407. # likely a programming error
408. self.log("hash failure in block=%d, shnum=%d on %s" %
409. (blocknum, self.sharenum, self.bucket))
410. if self.block_hash_tree.needed_hashes(blocknum):
411. self.log(""" failure occurred when checking the block_hash_tree.
412. This suggests that either the block data was bad, or that the
413. block hashes we received along with it were bad.""")
414. else:
415. self.log(""" the failure probably occurred when checking the
416. share_hash_tree, which suggests that the share hashes we
417. received from the remote peer were bad.""")
418. self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
419. self.log(" block length: %d" % len(blockdata))
420. self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
421. if len(blockdata) < 100:
422. self.log(" block data: %r" % (blockdata,))
423. else:
424. self.log(" block data start/end: %r .. %r" %
425. (blockdata[:50], blockdata[-50:]))
426. self.log(" share hash tree:\n" + self.share_hash_tree.dump())
427. self.log(" block hash tree:\n" + self.block_hash_tree.dump())
428. lines = []
429. for i,h in sorted(sharehashes.items()):
430. lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
431. self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
432. lines = []
433. for i,h in blockhashes.items():
434. lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
435. log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
436. raise BadOrMissingHash(le)
437.
438. # If we made it here, the block is good. If the hash trees didn't
439. # like what they saw, they would have raised a BadHashError, causing
440. # our caller to see a Failure and thus ignore this block (as well as
441. # dropping this bucket).
442. return blockdata
443.
444.
445.
446. class BlockDownloader(log.PrefixingLogMixin):
447. """I am responsible for downloading a single block (from a single bucket)
448. for a single segment.
449.
450. I am a child of the SegmentDownloader.
451. """
452.
453. def __init__(self, vbucket, blocknum, parent, results):
454. precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
455. prefix = "%s-%d" % (vbucket, blocknum)
456. log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
457. self.vbucket = vbucket
458. self.blocknum = blocknum
459. self.parent = parent
460. self.results = results
461.
462. def start(self, segnum):
463. self.log("get_block(segnum=%d)" % segnum)
464. started = time.time()
465. d = self.vbucket.get_block(segnum)
466. d.addCallbacks(self._hold_block, self._got_block_error,
467. callbackArgs=(started,))
468. return d
469.
470. def _hold_block(self, data, started):
471. if self.results:
472. elapsed = time.time() - started
473. peerid = self.vbucket.bucket.get_peerid()
474. if peerid not in self.results.timings["fetch_per_server"]:
475. self.results.timings["fetch_per_server"][peerid] = []
476. self.results.timings["fetch_per_server"][peerid].append(elapsed)
477. self.log("got block")
478. self.parent.hold_block(self.blocknum, data)
479.
480. def _got_block_error(self, f):
481. failtype = f.trap(RemoteException, DeadReferenceError,
482. IntegrityCheckReject,
483. layout.LayoutInvalid, layout.ShareVersionIncompatible)
484. if f.check(RemoteException, DeadReferenceError):
485. level = log.UNUSUAL
486. else:
487. level = log.WEIRD
488. self.log("failure to get block", level=level, umid="5Z4uHQ")
489. if self.results:
490. peerid = self.vbucket.bucket.get_peerid()
491. self.results.server_problems[peerid] = str(f)
492. self.parent.bucket_failed(self.vbucket)
493.
494. class SegmentDownloader:
495. """I am responsible for downloading all the blocks for a single segment
496. of data.
497.
498. I am a child of the CiphertextDownloader.
499. """
500.
501. def __init__(self, parent, segmentnumber, needed_shares, results):
502. self.parent = parent
503. self.segmentnumber = segmentnumber
504. self.needed_blocks = needed_shares
505. self.blocks = {} # k: blocknum, v: data
506. self.results = results
507. self._log_number = self.parent.log("starting segment %d" %
508. segmentnumber)
509.
510. def log(self, *args, **kwargs):
511. if "parent" not in kwargs:
512. kwargs["parent"] = self._log_number
513. return self.parent.log(*args, **kwargs)
514.
515. def start(self):
516. return self._download()
517.
518. def _download(self):
519. d = self._try()
520. def _done(res):
521. if len(self.blocks) >= self.needed_blocks:
522. # we only need self.needed_blocks blocks
523. # we want to get the smallest blockids, because they are
524. # more likely to be fast "primary blocks"
525. blockids = sorted(self.blocks.keys())[:self.needed_blocks]
526. blocks = []
527. for blocknum in blockids:
528. blocks.append(self.blocks[blocknum])
529. return (blocks, blockids)
530. else:
531. return self._download()
532. d.addCallback(_done)
533. return d
534.
535. def _try(self):
536. # fill our set of active buckets, maybe raising NotEnoughSharesError
537. active_buckets = self.parent._activate_enough_buckets()
538. # Now we have enough buckets, in self.parent.active_buckets.
539.
540. # in test cases, bd.start might mutate active_buckets right away, so
541. # we need to put off calling start() until we've iterated all the way
542. # through it.
543. downloaders = []
544. for blocknum, vbucket in active_buckets.iteritems():
545. assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
546. bd = BlockDownloader(vbucket, blocknum, self, self.results)
547. downloaders.append(bd)
548. if self.results:
549. self.results.servers_used.add(vbucket.bucket.get_peerid())
550. l = [bd.start(self.segmentnumber) for bd in downloaders]
551. return defer.DeferredList(l, fireOnOneErrback=True)
552.
553. def hold_block(self, blocknum, data):
554. self.blocks[blocknum] = data
555.
556. def bucket_failed(self, vbucket):
557. self.parent.bucket_failed(vbucket)
558.
559. class DownloadStatus:
560. implements(IDownloadStatus)
561. statusid_counter = itertools.count(0)
562.
563. def __init__(self):
564. self.storage_index = None
565. self.size = None
566. self.helper = False
567. self.status = "Not started"
568. self.progress = 0.0
569. self.paused = False
570. self.stopped = False
571. self.active = True
572. self.results = None
573. self.counter = self.statusid_counter.next()
574. self.started = time.time()
575.
576. def get_started(self):
577. return self.started
578. def get_storage_index(self):
579. return self.storage_index
580. def get_size(self):
581. return self.size
582. def using_helper(self):
583. return self.helper
584. def get_status(self):
585. status = self.status
586. if self.paused:
587. status += " (output paused)"
588. if self.stopped:
589. status += " (output stopped)"
590. return status
591. def get_progress(self):
592. return self.progress
593. def get_active(self):
594. return self.active
595. def get_results(self):
596. return self.results
597. def get_counter(self):
598. return self.counter
599.
600. def set_storage_index(self, si):
601. self.storage_index = si
602. def set_size(self, size):
603. self.size = size
604. def set_helper(self, helper):
605. self.helper = helper
606. def set_status(self, status):
607. self.status = status
608. def set_paused(self, paused):
609. self.paused = paused
610. def set_stopped(self, stopped):
611. self.stopped = stopped
612. def set_progress(self, value):
613. self.progress = value
614. def set_active(self, value):
615. self.active = value
616. def set_results(self, value):
617. self.results = value
618.
619. class CiphertextDownloader(log.PrefixingLogMixin):
620. """ I download shares, check their integrity, then decode them, check the
621. integrity of the resulting ciphertext, then and write it to my target.
622. Before I send any new request to a server, I always ask the 'monitor'
623. object that was passed into my constructor whether this task has been
624. cancelled (by invoking its raise_if_cancelled() method)."""
625. implements(IPushProducer)
626. _status = None
627.
628. def __init__(self, storage_broker, v, target, monitor):
629.
630. precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
631. precondition(IVerifierURI.providedBy(v), v)
632. precondition(IDownloadTarget.providedBy(target), target)
633.
634. prefix=base32.b2a_l(v.storage_index[:8], 60)
635. log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
636. self._storage_broker = storage_broker
637.
638. self._verifycap = v
639. self._storage_index = v.storage_index
640. self._uri_extension_hash = v.uri_extension_hash
641.
642. self._started = time.time()
643. self._status = s = DownloadStatus()
644. s.set_status("Starting")
645. s.set_storage_index(self._storage_index)
646. s.set_size(self._verifycap.size)
647. s.set_helper(False)
648. s.set_active(True)
649.
650. self._results = DownloadResults()
651. s.set_results(self._results)
652. self._results.file_size = self._verifycap.size
653. self._results.timings["servers_peer_selection"] = {}
654. self._results.timings["fetch_per_server"] = {}
655. self._results.timings["cumulative_fetch"] = 0.0
656. self._results.timings["cumulative_decode"] = 0.0
657. self._results.timings["cumulative_decrypt"] = 0.0
658. self._results.timings["paused"] = 0.0
659.
660. self._paused = False
661. self._stopped = False
662. if IConsumer.providedBy(target):
663. target.registerProducer(self, True)
664. self._target = target
665. self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
666. self._monitor = monitor
667. self._opened = False
668.
669. self.active_buckets = {} # k: shnum, v: bucket
670. self._share_buckets = [] # list of (sharenum, bucket) tuples
671. self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
672.
673. self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
674.
675. self._ciphertext_hasher = hashutil.crypttext_hasher()
676.
677. self._bytes_done = 0
678. self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
679.
680. # _got_uri_extension() will create the following:
681. # self._crypttext_hash_tree
682. # self._share_hash_tree
683. # self._current_segnum = 0
684. # self._vup # ValidatedExtendedURIProxy
685.
686. def pauseProducing(self):
687. if self._paused:
688. return
689. self._paused = defer.Deferred()
690. self._paused_at = time.time()
691. if self._status:
692. self._status.set_paused(True)
693.
694. def resumeProducing(self):
695. if self._paused:
696. paused_for = time.time() - self._paused_at
697. self._results.timings['paused'] += paused_for
698. p = self._paused
699. self._paused = None
700. eventually(p.callback, None)
701. if self._status:
702. self._status.set_paused(False)
703.
704. def stopProducing(self):
705. self.log("Download.stopProducing")
706. self._stopped = True
707. self.resumeProducing()
708. if self._status:
709. self._status.set_stopped(True)
710. self._status.set_active(False)
711.
712. def start(self):
713. self.log("starting download")
714.
715. # first step: who should we download from?
716. d = defer.maybeDeferred(self._get_all_shareholders)
717. d.addCallback(self._got_all_shareholders)
718. # now get the uri_extension block from somebody and integrity check it and parse and validate its contents
719. d.addCallback(self._obtain_uri_extension)
720. d.addCallback(self._get_crypttext_hash_tree)
721. # once we know that, we can download blocks from everybody
722. d.addCallback(self._download_all_segments)
723. def _finished(res):
724. if self._status:
725. self._status.set_status("Finished")
726. self._status.set_active(False)
727. self._status.set_paused(False)
728. if IConsumer.providedBy(self._target):
729. self._target.unregisterProducer()
730. return res
731. d.addBoth(_finished)
732. def _failed(why):
733. if self._status:
734. self._status.set_status("Failed")
735. self._status.set_active(False)
736. if why.check(DownloadStopped):
737. # DownloadStopped just means the consumer aborted the download; not so scary.
738. self.log("download stopped", level=log.UNUSUAL)
739. else:
740. # This is really unusual, and deserves maximum forensics.
741. self.log("download failed!", failure=why, level=log.SCARY, umid="lp1vaQ")
742. return why
743. d.addErrback(_failed)
744. d.addCallback(self._done)
745. return d
746.
747. def _get_all_shareholders(self):
748. dl = []
749. sb = self._storage_broker
750. servers = sb.get_servers_for_index(self._storage_index)
751. if not servers:
752. raise NoServersError("broker gave us no servers!")
753. for (peerid,ss) in servers:
754. self.log(format="sending DYHB to [%(peerid)s]",
755. peerid=idlib.shortnodeid_b2a(peerid),
756. level=log.NOISY, umid="rT03hg")
757. d = ss.callRemote("get_buckets", self._storage_index)
758. d.addCallbacks(self._got_response, self._got_error,
759. callbackArgs=(peerid,))
760. dl.append(d)
761. self._responses_received = 0
762. self._queries_sent = len(dl)
763. if self._status:
764. self._status.set_status("Locating Shares (%d/%d)" %
765. (self._responses_received,
766. self._queries_sent))
767. return defer.DeferredList(dl)
768.
769. def _got_response(self, buckets, peerid):
770. self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
771. peerid=idlib.shortnodeid_b2a(peerid),
772. shnums=sorted(buckets.keys()),
773. level=log.NOISY, umid="o4uwFg")
774. self._responses_received += 1
775. if self._results:
776. elapsed = time.time() - self._started
777. self._results.timings["servers_peer_selection"][peerid] = elapsed
778. if self._status:
779. self._status.set_status("Locating Shares (%d/%d)" %
780. (self._responses_received,
781. self._queries_sent))
782. for sharenum, bucket in buckets.iteritems():
783. b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
784. self.add_share_bucket(sharenum, b)
785.
786. if self._results:
787. if peerid not in self._results.servermap:
788. self._results.servermap[peerid] = set()
789. self._results.servermap[peerid].add(sharenum)
790.
791. def add_share_bucket(self, sharenum, bucket):
792. # this is split out for the benefit of test_encode.py
793. self._share_buckets.append( (sharenum, bucket) )
794.
795. def _got_error(self, f):
796. level = log.WEIRD
797. if f.check(DeadReferenceError):
798. level = log.UNUSUAL
799. self.log("Error during get_buckets", failure=f, level=level,
800. umid="3uuBUQ")
801.
802. def bucket_failed(self, vbucket):
803. shnum = vbucket.sharenum
804. del self.active_buckets[shnum]
805. s = self._share_vbuckets[shnum]
806. # s is a set of ValidatedReadBucketProxy instances
807. s.remove(vbucket)
808. # ... which might now be empty
809. if not s:
810. # there are no more buckets which can provide this share, so
811. # remove the key. This may prompt us to use a different share.
812. del self._share_vbuckets[shnum]
813.
814. def _got_all_shareholders(self, res):
815. if self._results:
816. now = time.time()
817. self._results.timings["peer_selection"] = now - self._started
818.
819. if len(self._share_buckets) < self._verifycap.needed_shares:
820. msg = "Failed to get enough shareholders: have %d, need %d" \
821. % (len(self._share_buckets), self._verifycap.needed_shares)
822. if self._share_buckets:
823. raise NotEnoughSharesError(msg)
824. else:
825. raise NoSharesError(msg)
826.
827. #for s in self._share_vbuckets.values():
828. # for vb in s:
829. # assert isinstance(vb, ValidatedReadBucketProxy), \
830. # "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
831.
832. def _obtain_uri_extension(self, ignored):
833. # all shareholders are supposed to have a copy of uri_extension, and
834. # all are supposed to be identical. We compute the hash of the data
835. # that comes back, and compare it against the version in our URI. If
836. # they don't match, ignore their data and try someone else.
837. if self._status:
838. self._status.set_status("Obtaining URI Extension")
839.
840. uri_extension_fetch_started = time.time()
841.
842. vups = []
843. for sharenum, bucket in self._share_buckets:
844. vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
845. vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
846. d = vto.start()
847.
848. def _got_uri_extension(vup):
849. precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
850. if self._results:
851. elapsed = time.time() - uri_extension_fetch_started
852. self._results.timings["uri_extension"] = elapsed
853.
854. self._vup = vup
855. self._codec = codec.CRSDecoder()
856. self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
857. self._tail_codec = codec.CRSDecoder()
858. self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
859.
860. self._current_segnum = 0
861.
862. self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
863. self._share_hash_tree.set_hashes({0: vup.share_root_hash})
864.
865. self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
866. self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
867.
868. # Repairer (uploader) needs the encodingparams.
869. self._target.set_encodingparams((
870. self._verifycap.needed_shares,
871. self._verifycap.total_shares, # I don't think the target actually cares about "happy".
872. self._verifycap.total_shares,
873. self._vup.segment_size
874. ))
875. d.addCallback(_got_uri_extension)
876. return d
877.
878. def _get_crypttext_hash_tree(self, res):
879. vchtps = []
880. for sharenum, bucket in self._share_buckets:
881. vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
882. vchtps.append(vchtp)
883.
884. _get_crypttext_hash_tree_started = time.time()
885. if self._status:
886. self._status.set_status("Retrieving crypttext hash tree")
887.
888. vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
889. d = vto.start()
890.
891. def _got_crypttext_hash_tree(res):
892. # Good -- the self._crypttext_hash_tree that we passed to vchtp is now populated
893. # with hashes.
894. if self._results:
895. elapsed = time.time() - _get_crypttext_hash_tree_started
896. self._results.timings["hashtrees"] = elapsed
897. d.addCallback(_got_crypttext_hash_tree)
898. return d
899.
900. def _activate_enough_buckets(self):
901. """either return a mapping from shnum to a ValidatedReadBucketProxy that can
902. provide data for that share, or raise NotEnoughSharesError"""
903.
904. while len(self.active_buckets) < self._verifycap.needed_shares:
905. # need some more
906. handled_shnums = set(self.active_buckets.keys())
907. available_shnums = set(self._share_vbuckets.keys())
908. potential_shnums = list(available_shnums - handled_shnums)
909. if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
910. have = len(potential_shnums) + len(self.active_buckets)
911. msg = "Unable to activate enough shares: have %d, need %d" \
912. % (have, self._verifycap.needed_shares)
913. if have:
914. raise NotEnoughSharesError(msg)
915. else:
916. raise NoSharesError(msg)
917. # For the next share, choose a primary share if available, else a randomly chosen
918. # secondary share.
919. potential_shnums.sort()
920. if potential_shnums[0] < self._verifycap.needed_shares:
921. shnum = potential_shnums[0]
922. else:
923. shnum = random.choice(potential_shnums)
924. # and a random bucket that will provide it
925. validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
926. self.active_buckets[shnum] = validated_bucket
927. return self.active_buckets
928.
929.
930. def _download_all_segments(self, res):
931. for sharenum, bucket in self._share_buckets:
932. vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
933. self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
934.
935. # after the above code, self._share_vbuckets contains enough
936. # buckets to complete the download, and some extra ones to
937. # tolerate some buckets dropping out or having
938. # errors. self._share_vbuckets is a dictionary that maps from
939. # shnum to a set of ValidatedBuckets, which themselves are
940. # wrappers around RIBucketReader references.
941. self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
942.
943. self._started_fetching = time.time()
944.
945. d = defer.succeed(None)
946. for segnum in range(self._vup.num_segments):
947. d.addCallback(self._download_segment, segnum)
948. # this pause, at the end of write, prevents pre-fetch from
949. # happening until the consumer is ready for more data.
950. d.addCallback(self._check_for_pause)
951. return d
952.
953. def _check_for_pause(self, res):
954. if self._paused:
955. d = defer.Deferred()
956. self._paused.addCallback(lambda ignored: d.callback(res))
957. return d
958. if self._stopped:
959. raise DownloadStopped("our Consumer called stopProducing()")
960. self._monitor.raise_if_cancelled()
961. return res
962.
963. def _download_segment(self, res, segnum):
964. if self._status:
965. self._status.set_status("Downloading segment %d of %d" %
966. (segnum+1, self._vup.num_segments))
967. self.log("downloading seg#%d of %d (%d%%)"
968. % (segnum, self._vup.num_segments,
969. 100.0 * segnum / self._vup.num_segments))
970. # memory footprint: when the SegmentDownloader finishes pulling down
971. # all shares, we have 1*segment_size of usage.
972. segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
973. self._results)
974. started = time.time()
975. d = segmentdler.start()
976. def _finished_fetching(res):
977. elapsed = time.time() - started
978. self._results.timings["cumulative_fetch"] += elapsed
979. return res
980. if self._results:
981. d.addCallback(_finished_fetching)
982. # pause before using more memory
983. d.addCallback(self._check_for_pause)
984. # while the codec does its job, we hit 2*segment_size
985. def _started_decode(res):
986. self._started_decode = time.time()
987. return res
988. if self._results:
989. d.addCallback(_started_decode)
990. if segnum + 1 == self._vup.num_segments:
991. codec = self._tail_codec
992. else:
993. codec = self._codec
994. d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
995. # once the codec is done, we drop back to 1*segment_size, because
996. # 'shares' goes out of scope. The memory usage is all in the
997. # plaintext now, spread out into a bunch of tiny buffers.
998. def _finished_decode(res):
999. elapsed = time.time() - self._started_decode
1000. self._results.timings["cumulative_decode"] += elapsed
1001. return res
1002. if self._results:
1003. d.addCallback(_finished_decode)
1004.
1005. # pause/check-for-stop just before writing, to honor stopProducing
1006. d.addCallback(self._check_for_pause)
1007. d.addCallback(self._got_segment)
1008. return d
1009.
1010. def _got_segment(self, buffers):
1011. precondition(self._crypttext_hash_tree)
1012. started_decrypt = time.time()
1013. self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
1014.
1015. if self._current_segnum + 1 == self._vup.num_segments:
1016. # This is the last segment.
1017. # Trim off any padding added by the upload side. We never send empty segments. If
1018. # the data was an exact multiple of the segment size, the last segment will be full.
1019. tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
1020. num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
1021. # Remove buffers which don't contain any part of the tail.
1022. del buffers[num_buffers_used:]
1023. # Remove the past-the-tail-part of the last buffer.
1024. tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
1025. if tail_in_last_buf == 0:
1026. tail_in_last_buf = tail_buf_size
1027. buffers[-1] = buffers[-1][:tail_in_last_buf]
1028.
1029. # First compute the hash of this segment and check that it fits.
1030. ch = hashutil.crypttext_segment_hasher()
1031. for buffer in buffers:
1032. self._ciphertext_hasher.update(buffer)
1033. ch.update(buffer)
1034. self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
1035.
1036. # Then write this segment to the target.
1037. if not self._opened:
1038. self._opened = True
1039. self._target.open(self._verifycap.size)
1040.
1041. for buffer in buffers:
1042. self._target.write(buffer)
1043. self._bytes_done += len(buffer)
1044.
1045. self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1046. self._current_segnum += 1
1047.
1048. if self._results:
1049. elapsed = time.time() - started_decrypt
1050. self._results.timings["cumulative_decrypt"] += elapsed
1051.
1052. def _done(self, res):
1053. self.log("download done")
1054. if self._results:
1055. now = time.time()
1056. self._results.timings["total"] = now - self._started
1057. self._results.timings["segments"] = now - self._started_fetching
1058. if self._vup.crypttext_hash:
1059. _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
1060. "bad crypttext_hash: computed=%s, expected=%s" %
1061. (base32.b2a(self._ciphertext_hasher.digest()),
1062. base32.b2a(self._vup.crypttext_hash)))
1063. _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
1064. self._status.set_progress(1)
1065. self._target.close()
1066. return self._target.finish()
1067. def get_download_status(self):
1068. return self._status
1069.
1070.
1071. class FileName:
1072. implements(IDownloadTarget)
1073. def __init__(self, filename):
1074. self._filename = filename
1075. self.f = None
1076. def open(self, size):
1077. self.f = open(self._filename, "wb")
1078. return self.f
1079. def write(self, data):
1080. self.f.write(data)
1081. def close(self):
1082. if self.f:
1083. self.f.close()
1084. def fail(self, why):
1085. if self.f:
1086. self.f.close()
1087. os.unlink(self._filename)
1088. def register_canceller(self, cb):
1089. pass # we won't use it
1090. def finish(self):
1091. pass
1092. # The following methods are just because the target might be a repairer.DownUpConnector,
1093. # and just because the current CHKUpload object expects to find the storage index and
1094. # encoding parameters in its Uploadable.
1095. def set_storageindex(self, storageindex):
1096. pass
1097. def set_encodingparams(self, encodingparams):
1098. pass
1099.
1100. class Data:
1101. implements(IDownloadTarget)
1102. def __init__(self):
1103. self._data = []
1104. def open(self, size):
1105. pass
1106. def write(self, data):
1107. self._data.append(data)
1108. def close(self):
1109. self.data = "".join(self._data)
1110. del self._data
1111. def fail(self, why):
1112. del self._data
1113. def register_canceller(self, cb):
1114. pass # we won't use it
1115. def finish(self):
1116. return self.data
1117. # The following methods are just because the target might be a repairer.DownUpConnector,
1118. # and just because the current CHKUpload object expects to find the storage index and
1119. # encoding parameters in its Uploadable.
1120. def set_storageindex(self, storageindex):
1121. pass
1122. def set_encodingparams(self, encodingparams):
1123. pass
1124.
1125. class FileHandle:
1126. """Use me to download data to a pre-defined filehandle-like object. I
1127. will use the target's write() method. I will *not* close the filehandle:
1128. I leave that up to the originator of the filehandle. The download process
1129. will return the filehandle when it completes.
1130. """
1131. implements(IDownloadTarget)
1132. def __init__(self, filehandle):
1133. self._filehandle = filehandle
1134. def open(self, size):
1135. pass
1136. def write(self, data):
1137. self._filehandle.write(data)
1138. def close(self):
1139. # the originator of the filehandle reserves the right to close it
1140. pass
1141. def fail(self, why):
1142. pass
1143. def register_canceller(self, cb):
1144. pass
1145. def finish(self):
1146. return self._filehandle
1147. # The following methods are just because the target might be a repairer.DownUpConnector,
1148. # and just because the current CHKUpload object expects to find the storage index and
1149. # encoding parameters in its Uploadable.
1150. def set_storageindex(self, storageindex):
1151. pass
1152. def set_encodingparams(self, encodingparams):
1153. pass
1154.
1155. class ConsumerAdapter:
1156. implements(IDownloadTarget, IConsumer)
1157. def __init__(self, consumer):
1158. self._consumer = consumer
1159.
1160. def registerProducer(self, producer, streaming):
1161. self._consumer.registerProducer(producer, streaming)
1162. def unregisterProducer(self):
1163. self._consumer.unregisterProducer()
1164.
1165. def open(self, size):
1166. pass
1167. def write(self, data):
1168. self._consumer.write(data)
1169. def close(self):
1170. pass
1171.
1172. def fail(self, why):
1173. pass
1174. def register_canceller(self, cb):
1175. pass
1176. def finish(self):
1177. return self._consumer
1178. # The following methods are just because the target might be a repairer.DownUpConnector,
1179. # and just because the current CHKUpload object expects to find the storage index and
1180. # encoding parameters in its Uploadable.
1181. def set_storageindex(self, storageindex):
1182. pass
1183. def set_encodingparams(self, encodingparams):
1184. pass
1185.
1186.
1187. class Downloader:
1188. """I am a service that allows file downloading.
1189. """
1190. # TODO: in fact, this service only downloads immutable files (URI:CHK:).
1191. # It is scheduled to go away, to be replaced by filenode.download()
1192. implements(IDownloader)
1193.
1194. def __init__(self, storage_broker, stats_provider):
1195. self.storage_broker = storage_broker
1196. self.stats_provider = stats_provider
1197. self._all_downloads = weakref.WeakKeyDictionary() # for debugging
1198.
1199. def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
1200. u = IFileURI(u)
1201. t = IDownloadTarget(t)
1202. assert t.write
1203. assert t.close
1204.
1205. assert isinstance(u, uri.CHKFileURI)
1206. if self.stats_provider:
1207. # these counters are meant for network traffic, and don't
1208. # include LIT files
1209. self.stats_provider.count('downloader.files_downloaded', 1)
1210. self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
1211.
1212. target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
1213. if not monitor:
1214. monitor=Monitor()
1215. dl = CiphertextDownloader(self.storage_broker,
1216. u.get_verify_cap(), target,
1217. monitor=monitor)
1218. self._all_downloads[dl] = None
1219. if history:
1220. history.add_download(dl.get_download_status())
1221. d = dl.start()
1222. return d
1223.
1224. # utility functions
1225. def download_to_data(self, uri, _log_msg_id=None, history=None):
1226. return self.download(uri, Data(), _log_msg_id=_log_msg_id, history=history)
1227. def download_to_filename(self, uri, filename, _log_msg_id=None):
1228. return self.download(uri, FileName(filename), _log_msg_id=_log_msg_id)
1229. def download_to_filehandle(self, uri, filehandle, _log_msg_id=None):
1230. return self.download(uri, FileHandle(filehandle), _log_msg_id=_log_msg_id)