source: trunk/src/allmydata/immutable/encode.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 30.7 KB
Line 
1# -*- test-case-name: allmydata.test.test_encode -*-
2
3"""
4Ported to Python 3.
5"""
6
7import time
8from zope.interface import implementer
9from twisted.internet import defer
10from foolscap.api import fireEventually
11from allmydata import uri
12from allmydata.storage.server import si_b2a
13from allmydata.hashtree import HashTree
14from allmydata.util import mathutil, hashutil, base32, log, happinessutil
15from allmydata.util.assertutil import _assert, precondition
16from allmydata.codec import CRSEncoder
17from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
18     IEncryptedUploadable, IUploadStatus, UploadUnhappinessError
19
20from ..util.eliotutil import (
21    log_call_deferred,
22)
23
24"""
25The goal of the encoder is to turn the original file into a series of
26'shares'. Each share is going to a 'shareholder' (nominally each shareholder
27is a different host, but for small grids there may be overlap). The number
28of shares is chosen to hit our reliability goals (more shares on more
29machines means more reliability), and is limited by overhead (proportional to
30numshares or log(numshares)) and the encoding technology in use (zfec permits
31only 256 shares total). It is also constrained by the amount of data
32we want to send to each host. For estimating purposes, think of 10 shares
33out of which we need 3 to reconstruct the file.
34
35The encoder starts by cutting the original file into segments. All segments
36except the last are of equal size. The segment size is chosen to constrain
37the memory footprint (which will probably vary between 1x and 4x segment
38size) and to constrain the overhead (which will be proportional to
39log(number of segments)).
40
41
42Each segment (A,B,C) is read into memory, encrypted, and encoded into
43blocks. The 'share' (say, share #1) that makes it out to a host is a
44collection of these blocks (block A1, B1, C1), plus some hash-tree
45information necessary to validate the data upon retrieval. Only one segment
46is handled at a time: all blocks for segment A are delivered before any
47work is begun on segment B.
48
49As blocks are created, we retain the hash of each one. The list of block hashes
50for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base
51of a Merkle hash tree for that share, called the block hash tree.
52
53This hash tree has one terminal leaf per block. The complete block hash
54tree is sent to the shareholder after all the data has been sent. At
55retrieval time, the decoder will ask for specific pieces of this tree before
56asking for blocks, whichever it needs to validate those blocks.
57
58(Note: we don't really need to generate this whole block hash tree
59ourselves. It would be sufficient to have the shareholder generate it and
60just tell us the root. This gives us an extra level of validation on the
61transfer, though, and it is relatively cheap to compute.)
62
63Each of these block hash trees has a root hash. The collection of these
64root hashes for all shares are collected into the 'share hash tree', which
65has one terminal leaf per share. After sending the blocks and the complete
66block hash tree to each shareholder, we send them the portion of the share
67hash tree that is necessary to validate their share. The root of the share
68hash tree is put into the URI.
69
70"""
71
72class UploadAborted(Exception):
73    pass
74
75KiB=1024
76MiB=1024*KiB
77GiB=1024*MiB
78TiB=1024*GiB
79PiB=1024*TiB
80
81@implementer(IEncoder)
82class Encoder(object):
83
84    def __init__(self, log_parent=None, upload_status=None):
85        object.__init__(self)
86        self.uri_extension_data = {}
87        self._codec = None
88        self._status = None
89        if upload_status:
90            self._status = IUploadStatus(upload_status)
91        precondition(log_parent is None or isinstance(log_parent, int),
92                     log_parent)
93        self._log_number = log.msg("creating Encoder %s" % self,
94                                   facility="tahoe.encoder", parent=log_parent)
95        self._aborted = False
96
97    def __repr__(self):
98        if hasattr(self, "_storage_index"):
99            return "<Encoder for %r>" % si_b2a(self._storage_index)[:5]
100        return "<Encoder for unknown storage index>"
101
102    def log(self, *args, **kwargs):
103        if "parent" not in kwargs:
104            kwargs["parent"] = self._log_number
105        if "facility" not in kwargs:
106            kwargs["facility"] = "tahoe.encoder"
107        return log.msg(*args, **kwargs)
108
109    @log_call_deferred(action_type=u"immutable:encode:set-encrypted-uploadable")
110    def set_encrypted_uploadable(self, uploadable):
111        eu = self._uploadable = IEncryptedUploadable(uploadable)
112        d = eu.get_size()
113        def _got_size(size):
114            self.log(format="file size: %(size)d", size=size)
115            self.file_size = size
116        d.addCallback(_got_size)
117        d.addCallback(lambda res: eu.get_all_encoding_parameters())
118        d.addCallback(self._got_all_encoding_parameters)
119        d.addCallback(lambda res: eu.get_storage_index())
120        def _done(storage_index):
121            self._storage_index = storage_index
122            return self
123        d.addCallback(_done)
124        return d
125
126    def _got_all_encoding_parameters(self, params):
127        assert not self._codec
128        k, happy, n, segsize = params
129        self.required_shares = k
130        self.min_happiness = happy
131        self.num_shares = n
132        self.segment_size = segsize
133        self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
134        self.log("now setting up codec")
135
136        assert self.segment_size % self.required_shares == 0
137
138        self.num_segments = mathutil.div_ceil(self.file_size,
139                                              self.segment_size)
140
141        self._codec = CRSEncoder()
142        self._codec.set_params(self.segment_size,
143                               self.required_shares, self.num_shares)
144
145        data = self.uri_extension_data
146        data['codec_name'] = self._codec.get_encoder_type()
147        data['codec_params'] = self._codec.get_serialized_params()
148
149        data['size'] = self.file_size
150        data['segment_size'] = self.segment_size
151        self.share_size = mathutil.div_ceil(self.file_size,
152                                            self.required_shares)
153        data['num_segments'] = self.num_segments
154        data['needed_shares'] = self.required_shares
155        data['total_shares'] = self.num_shares
156
157        # the "tail" is the last segment. This segment may or may not be
158        # shorter than all other segments. We use the "tail codec" to handle
159        # it. If the tail is short, we use a different codec instance. In
160        # addition, the tail codec must be fed data which has been padded out
161        # to the right size.
162        tail_size = self.file_size % self.segment_size
163        if not tail_size:
164            tail_size = self.segment_size
165
166        # the tail codec is responsible for encoding tail_size bytes
167        padded_tail_size = mathutil.next_multiple(tail_size,
168                                                  self.required_shares)
169        self._tail_codec = CRSEncoder()
170        self._tail_codec.set_params(padded_tail_size,
171                                    self.required_shares, self.num_shares)
172        data['tail_codec_params'] = self._tail_codec.get_serialized_params()
173
174    def _get_share_size(self):
175        share_size = mathutil.div_ceil(self.file_size, self.required_shares)
176        overhead = self._compute_overhead()
177        return share_size + overhead
178
179    def _compute_overhead(self):
180        return 0
181
182    def get_param(self, name):
183        assert self._codec
184
185        if name == "storage_index":
186            return self._storage_index
187        elif name == "share_counts":
188            return (self.required_shares, self.min_happiness,
189                    self.num_shares)
190        elif name == "num_segments":
191            return self.num_segments
192        elif name == "segment_size":
193            return self.segment_size
194        elif name == "block_size":
195            return self._codec.get_block_size()
196        elif name == "share_size":
197            return self._get_share_size()
198        elif name == "serialized_params":
199            return self._codec.get_serialized_params()
200        else:
201            raise KeyError("unknown parameter name '%s'" % name)
202
203    def set_shareholders(self, landlords, servermap):
204        assert isinstance(landlords, dict)
205        for k in landlords:
206            assert IStorageBucketWriter.providedBy(landlords[k])
207        self.landlords = landlords.copy()
208        assert isinstance(servermap, dict)
209        for v in servermap.values():
210            assert isinstance(v, set)
211        self.servermap = servermap.copy()
212
213    @log_call_deferred(action_type=u"immutable:encode:start")
214    def start(self):
215        """ Returns a Deferred that will fire with the verify cap (an instance of
216        uri.CHKFileVerifierURI)."""
217        self.log("%s starting" % (self,))
218        #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
219        assert self._codec
220        self._crypttext_hasher = hashutil.crypttext_hasher()
221        self._crypttext_hashes = []
222        self.segment_num = 0
223        self.block_hashes = [[] for x in range(self.num_shares)]
224        # block_hashes[i] is a list that will be accumulated and then send
225        # to landlord[i]. This list contains a hash of each segment_share
226        # that we sent to that landlord.
227        self.share_root_hashes = [None] * self.num_shares
228
229        self._times = {
230            "cumulative_encoding": 0.0,
231            "cumulative_sending": 0.0,
232            "hashes_and_close": 0.0,
233            "total_encode_and_push": 0.0,
234            }
235        self._start_total_timestamp = time.time()
236
237        d = fireEventually()
238
239        d.addCallback(lambda res: self.start_all_shareholders())
240
241        for i in range(self.num_segments-1):
242            # note to self: this form doesn't work, because lambda only
243            # captures the slot, not the value
244            #d.addCallback(lambda res: self.do_segment(i))
245            # use this form instead:
246            d.addCallback(lambda res, i=i: self._encode_segment(i, is_tail=False))
247            d.addCallback(self._send_segment, i)
248            d.addCallback(self._turn_barrier)
249        last_segnum = self.num_segments - 1
250        d.addCallback(lambda res: self._encode_segment(last_segnum, is_tail=True))
251        d.addCallback(self._send_segment, last_segnum)
252        d.addCallback(self._turn_barrier)
253
254        d.addCallback(lambda res: self.finish_hashing())
255
256        # These calls have to happen in order; layout.py now requires writes to
257        # be appended to the data written so far.
258        d.addCallback(lambda res:
259                      self.send_crypttext_hash_tree_to_all_shareholders())
260        d.addCallback(lambda res: self.send_all_block_hash_trees())
261        d.addCallback(lambda res: self.send_all_share_hash_trees())
262        d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
263
264        d.addCallback(lambda res: self.close_all_shareholders())
265        d.addCallbacks(self.done, self.err)
266        return d
267
268    def set_status(self, status):
269        if self._status:
270            self._status.set_status(status)
271
272    def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
273        if self._status:
274            # we treat the final hash+close as an extra segment
275            if sent_segments is None:
276                sent_segments = self.num_segments
277            progress = float(sent_segments + extra) / (self.num_segments + 1)
278            self._status.set_progress(2, progress)
279
280    def abort(self):
281        self.log("aborting upload", level=log.UNUSUAL)
282        assert self._codec, "don't call abort before start"
283        self._aborted = True
284        # the next segment read (in _gather_data inside _encode_segment) will
285        # raise UploadAborted(), which will bypass the rest of the upload
286        # chain. If we've sent the final segment's shares, it's too late to
287        # abort. TODO: allow abort any time up to close_all_shareholders.
288
289    def _turn_barrier(self, res):
290        # putting this method in a Deferred chain imposes a guaranteed
291        # reactor turn between the pre- and post- portions of that chain.
292        # This can be useful to limit memory consumption: since Deferreds do
293        # not do tail recursion, code which uses defer.succeed(result) for
294        # consistency will cause objects to live for longer than you might
295        # normally expect.
296
297        return fireEventually(res)
298
299
300    def start_all_shareholders(self):
301        self.log("starting shareholders", level=log.NOISY)
302        self.set_status("Starting shareholders")
303        dl = []
304        for shareid in list(self.landlords):
305            d = self.landlords[shareid].put_header()
306            d.addErrback(self._remove_shareholder, shareid, "start")
307            dl.append(d)
308        return self._gather_responses(dl)
309
310    def _encode_segment(self, segnum, is_tail):
311        """
312        Encode one segment of input into the configured number of shares.
313
314        :param segnum: Ostensibly, the number of the segment to encode.  In
315            reality, this parameter is ignored and the *next* segment is
316            encoded and returned.
317
318        :param bool is_tail: ``True`` if this is the last segment, ``False``
319            otherwise.
320
321        :return: A ``Deferred`` which fires with a two-tuple.  The first
322            element is a list of string-y objects representing the encoded
323            segment data for one of the shares.  The second element is a list
324            of integers giving the share numbers of the shares in the first
325            element.
326        """
327        codec = self._tail_codec if is_tail else self._codec
328        start = time.time()
329
330        # the ICodecEncoder API wants to receive a total of self.segment_size
331        # bytes on each encode() call, broken up into a number of
332        # identically-sized pieces. Due to the way the codec algorithm works,
333        # these pieces need to be the same size as the share which the codec
334        # will generate. Therefore we must feed it with input_piece_size that
335        # equals the output share size.
336        input_piece_size = codec.get_block_size()
337
338        # as a result, the number of input pieces per encode() call will be
339        # equal to the number of required shares with which the codec was
340        # constructed. You can think of the codec as chopping up a
341        # 'segment_size' of data into 'required_shares' shares (not doing any
342        # fancy math at all, just doing a split), then creating some number
343        # of additional shares which can be substituted if the primary ones
344        # are unavailable
345
346        # we read data from the source one segment at a time, and then chop
347        # it into 'input_piece_size' pieces before handing it to the codec
348
349        crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
350
351        # memory footprint: we only hold a tiny piece of the plaintext at any
352        # given time. We build up a segment's worth of cryptttext, then hand
353        # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
354        # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
355        # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
356        # footprint to 430KiB at the expense of more hash-tree overhead.
357
358        d = self._gather_data(self.required_shares, input_piece_size,
359                              crypttext_segment_hasher, allow_short=is_tail)
360        def _done_gathering(chunks):
361            for c in chunks:
362                # If is_tail then a short trailing chunk will have been padded
363                # by _gather_data
364                assert len(c) == input_piece_size
365            self._crypttext_hashes.append(crypttext_segment_hasher.digest())
366            # during this call, we hit 5*segsize memory
367            return codec.encode(chunks)
368        d.addCallback(_done_gathering)
369        def _done(res):
370            elapsed = time.time() - start
371            self._times["cumulative_encoding"] += elapsed
372            return res
373        d.addCallback(_done)
374        return d
375
376    def _gather_data(self, num_chunks, input_chunk_size,
377                     crypttext_segment_hasher,
378                     allow_short=False):
379        """Return a Deferred that will fire when the required number of
380        chunks have been read (and hashed and encrypted). The Deferred fires
381        with a list of chunks, each of size input_chunk_size."""
382
383        # I originally built this to allow read_encrypted() to behave badly:
384        # to let it return more or less data than you asked for. It would
385        # stash the leftovers until later, and then recurse until it got
386        # enough. I don't think that was actually useful.
387        #
388        # who defines read_encrypted?
389        #  offloaded.LocalCiphertextReader: real disk file: exact
390        #  upload.EncryptAnUploadable: Uploadable, but a wrapper that makes
391        #    it exact. The return value is a list of 50KiB chunks, to reduce
392        #    the memory footprint of the encryption process.
393        #  repairer.Repairer: immutable.filenode.CiphertextFileNode: exact
394        #
395        # This has been redefined to require read_encrypted() to behave like
396        # a local file: return exactly the amount requested unless it hits
397        # EOF.
398        #  -warner
399
400        if self._aborted:
401            raise UploadAborted()
402
403        read_size = num_chunks * input_chunk_size
404        d = self._uploadable.read_encrypted(read_size, hash_only=False)
405        def _got(data):
406            assert isinstance(data, (list,tuple))
407            if self._aborted:
408                raise UploadAborted()
409            data = b"".join(data)
410            precondition(len(data) <= read_size, len(data), read_size)
411            if not allow_short:
412                precondition(len(data) == read_size, len(data), read_size)
413            crypttext_segment_hasher.update(data)
414            self._crypttext_hasher.update(data)
415            if allow_short and len(data) < read_size:
416                # padding
417                data += b"\x00" * (read_size - len(data))
418            encrypted_pieces = [data[i:i+input_chunk_size]
419                                for i in range(0, len(data), input_chunk_size)]
420            return encrypted_pieces
421        d.addCallback(_got)
422        return d
423
424    def _send_segment(self, shares_and_shareids, segnum):
425        # To generate the URI, we must generate the roothash, so we must
426        # generate all shares, even if we aren't actually giving them to
427        # anybody. This means that the set of shares we create will be equal
428        # to or larger than the set of landlords. If we have any landlord who
429        # *doesn't* have a share, that's an error.
430        (shares, shareids) = shares_and_shareids
431        _assert(set(self.landlords.keys()).issubset(set(shareids)),
432                shareids=shareids, landlords=self.landlords)
433        start = time.time()
434        dl = []
435        self.set_status("Sending segment %d of %d" % (segnum+1,
436                                                      self.num_segments))
437        self.set_encode_and_push_progress(segnum)
438        lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
439        for i in range(len(shares)):
440            block = shares[i]
441            shareid = shareids[i]
442            d = self.send_block(shareid, segnum, block, lognum)
443            dl.append(d)
444
445            block_hash = hashutil.block_hash(block)
446            #from allmydata.util import base32
447            #log.msg("creating block (shareid=%d, blocknum=%d) "
448            #        "len=%d %r .. %r: %s" %
449            #        (shareid, segnum, len(block),
450            #         block[:50], block[-50:], base32.b2a(block_hash)))
451            self.block_hashes[shareid].append(block_hash)
452
453        dl = self._gather_responses(dl)
454
455        def _logit(res):
456            self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
457                     (self,
458                      self.segment_size*(segnum+1),
459                      self.segment_size*self.num_segments,
460                      100 * (segnum+1) // self.num_segments,
461                      ),
462                     level=log.OPERATIONAL)
463            elapsed = time.time() - start
464            self._times["cumulative_sending"] += elapsed
465            return res
466        dl.addCallback(_logit)
467        return dl
468
469    def send_block(self, shareid, segment_num, block, lognum):
470        if shareid not in self.landlords:
471            return defer.succeed(None)
472        sh = self.landlords[shareid]
473        lognum2 = self.log("put_block to %s" % self.landlords[shareid],
474                           parent=lognum, level=log.NOISY)
475        d = sh.put_block(segment_num, block)
476        def _done(res):
477            self.log("put_block done", parent=lognum2, level=log.NOISY)
478            return res
479        d.addCallback(_done)
480        d.addErrback(self._remove_shareholder, shareid,
481                     "segnum=%d" % segment_num)
482        return d
483
484    def _remove_shareholder(self, why, shareid, where):
485        ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
486                      method=where, shnum=shareid,
487                      level=log.UNUSUAL, failure=why)
488        if shareid in self.landlords:
489            self.landlords[shareid].abort()
490            peerid = self.landlords[shareid].get_peerid()
491            assert peerid
492            del self.landlords[shareid]
493            self.servermap[shareid].remove(peerid)
494            if not self.servermap[shareid]:
495                del self.servermap[shareid]
496        else:
497            # even more UNUSUAL
498            self.log("they weren't in our list of landlords", parent=ln,
499                     level=log.WEIRD, umid="TQGFRw")
500        happiness = happinessutil.servers_of_happiness(self.servermap)
501        if happiness < self.min_happiness:
502            peerids = set(happinessutil.shares_by_server(self.servermap).keys())
503            msg = happinessutil.failure_message(len(peerids),
504                                                self.required_shares,
505                                                self.min_happiness,
506                                                happiness)
507            msg = "%s: %s" % (msg, why)
508            raise UploadUnhappinessError(msg)
509        self.log("but we can still continue with %s shares, we'll be happy "
510                 "with at least %s" % (happiness,
511                                       self.min_happiness),
512                 parent=ln)
513
514    def _gather_responses(self, dl):
515        d = defer.DeferredList(dl, fireOnOneErrback=True)
516        def _eatUploadUnhappinessError(f):
517            # all exceptions that occur while talking to a peer are handled
518            # in _remove_shareholder. That might raise UploadUnhappinessError,
519            # which will cause the DeferredList to errback but which should
520            # otherwise be consumed. Allow non-UploadUnhappinessError exceptions
521            # to pass through as an unhandled errback. We use this in lieu of
522            # consumeErrors=True to allow coding errors to be logged.
523            f.trap(UploadUnhappinessError)
524            return None
525        for d0 in dl:
526            d0.addErrback(_eatUploadUnhappinessError)
527        return d
528
529    def finish_hashing(self):
530        self._start_hashing_and_close_timestamp = time.time()
531        self.set_status("Finishing hashes")
532        self.set_encode_and_push_progress(extra=0.0)
533        crypttext_hash = self._crypttext_hasher.digest()
534        self.uri_extension_data["crypttext_hash"] = crypttext_hash
535        self._uploadable.close()
536
537    def send_crypttext_hash_tree_to_all_shareholders(self):
538        self.log("sending crypttext hash tree", level=log.NOISY)
539        self.set_status("Sending Crypttext Hash Tree")
540        self.set_encode_and_push_progress(extra=0.3)
541        t = HashTree(self._crypttext_hashes)
542        all_hashes = list(t)
543        self.uri_extension_data["crypttext_root_hash"] = t[0]
544        dl = []
545        for shareid in list(self.landlords):
546            dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
547        return self._gather_responses(dl)
548
549    def send_crypttext_hash_tree(self, shareid, all_hashes):
550        if shareid not in self.landlords:
551            return defer.succeed(None)
552        sh = self.landlords[shareid]
553        d = sh.put_crypttext_hashes(all_hashes)
554        d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
555        return d
556
557    def send_all_block_hash_trees(self):
558        self.log("sending block hash trees", level=log.NOISY)
559        self.set_status("Sending Subshare Hash Trees")
560        self.set_encode_and_push_progress(extra=0.4)
561        dl = []
562        for shareid,hashes in enumerate(self.block_hashes):
563            # hashes is a list of the hashes of all blocks that were sent
564            # to shareholder[shareid].
565            dl.append(self.send_one_block_hash_tree(shareid, hashes))
566        return self._gather_responses(dl)
567
568    def send_one_block_hash_tree(self, shareid, block_hashes):
569        t = HashTree(block_hashes)
570        all_hashes = list(t)
571        # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
572        # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
573        # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
574        self.share_root_hashes[shareid] = t[0]
575        if shareid not in self.landlords:
576            return defer.succeed(None)
577        sh = self.landlords[shareid]
578        d = sh.put_block_hashes(all_hashes)
579        d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
580        return d
581
582    def send_all_share_hash_trees(self):
583        # Each bucket gets a set of share hash tree nodes that are needed to validate their
584        # share. This includes the share hash itself, but does not include the top-level hash
585        # root (which is stored securely in the URI instead).
586        self.log("sending all share hash trees", level=log.NOISY)
587        self.set_status("Sending Share Hash Trees")
588        self.set_encode_and_push_progress(extra=0.6)
589        dl = []
590        for h in self.share_root_hashes:
591            assert h
592        # create the share hash tree
593        t = HashTree(self.share_root_hashes)
594        # the root of this hash tree goes into our URI
595        self.uri_extension_data['share_root_hash'] = t[0]
596        # now send just the necessary pieces out to each shareholder
597        for i in range(self.num_shares):
598            # the HashTree is given a list of leaves: 0,1,2,3..n .
599            # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
600            needed_hash_indices = t.needed_hashes(i, include_leaf=True)
601            hashes = [(hi, t[hi]) for hi in needed_hash_indices]
602            dl.append(self.send_one_share_hash_tree(i, hashes))
603        return self._gather_responses(dl)
604
605    def send_one_share_hash_tree(self, shareid, needed_hashes):
606        if shareid not in self.landlords:
607            return defer.succeed(None)
608        sh = self.landlords[shareid]
609        d = sh.put_share_hashes(needed_hashes)
610        d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
611        return d
612
613    def send_uri_extension_to_all_shareholders(self):
614        lp = self.log("sending uri_extension", level=log.NOISY)
615        self.set_status("Sending URI Extensions")
616        self.set_encode_and_push_progress(extra=0.8)
617        for k in ('crypttext_root_hash', 'crypttext_hash',
618                  ):
619            assert k in self.uri_extension_data
620        uri_extension = uri.pack_extension(self.uri_extension_data)
621        ed = {}
622        for k,v in self.uri_extension_data.items():
623            if k.endswith("hash"):
624                ed[k] = base32.b2a(v)
625            else:
626                ed[k] = v
627        self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
628        self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
629        dl = []
630        for shareid in list(self.landlords):
631            dl.append(self.send_uri_extension(shareid, uri_extension))
632        return self._gather_responses(dl)
633
634    def send_uri_extension(self, shareid, uri_extension):
635        sh = self.landlords[shareid]
636        d = sh.put_uri_extension(uri_extension)
637        d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
638        return d
639
640    def close_all_shareholders(self):
641        self.log("closing shareholders", level=log.NOISY)
642        self.set_status("Closing Shareholders")
643        self.set_encode_and_push_progress(extra=0.9)
644        dl = []
645        for shareid in list(self.landlords):
646            d = self.landlords[shareid].close()
647            d.addErrback(self._remove_shareholder, shareid, "close")
648            dl.append(d)
649        return self._gather_responses(dl)
650
651    def done(self, res):
652        self.log("upload done", level=log.OPERATIONAL)
653        self.set_status("Finished")
654        self.set_encode_and_push_progress(extra=1.0) # done
655        now = time.time()
656        h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
657        self._times["hashes_and_close"] = h_and_c_elapsed
658        total_elapsed = now - self._start_total_timestamp
659        self._times["total_encode_and_push"] = total_elapsed
660
661        # update our sharemap
662        self._shares_placed = set(self.landlords.keys())
663        return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
664                                      self.required_shares, self.num_shares, self.file_size)
665
666    def err(self, f):
667        self.log("upload failed", failure=f, level=log.UNUSUAL)
668        self.set_status("Failed")
669        # we need to abort any remaining shareholders, so they'll delete the
670        # partial share, allowing someone else to upload it again.
671        self.log("aborting shareholders", level=log.UNUSUAL)
672        for shareid in list(self.landlords):
673            self.landlords[shareid].abort()
674        if f.check(defer.FirstError):
675            return f.value.subFailure
676        return f
677
678    def get_shares_placed(self):
679        # return a set of share numbers that were successfully placed.
680        return self._shares_placed
681
682    def get_times(self):
683        # return a dictionary of encode+push timings
684        return self._times
685
686    def get_uri_extension_data(self):
687        return self.uri_extension_data
688    def get_uri_extension_hash(self):
689        return self.uri_extension_hash
690
691    def get_uri_extension_size(self):
692        """
693        Calculate the size of the URI extension that gets written at the end of
694        immutables.
695
696        This may be done earlier than actual encoding, so e.g. we might not
697        know the crypttext hashes, but that's fine for our purposes since we
698        only care about the length.
699        """
700        params = self.uri_extension_data.copy()
701        params["crypttext_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
702        params["crypttext_root_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
703        params["share_root_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
704        assert params.keys() == {
705            "codec_name", "codec_params", "size", "segment_size", "num_segments",
706            "needed_shares", "total_shares", "tail_codec_params",
707            "crypttext_hash", "crypttext_root_hash", "share_root_hash"
708        }, params.keys()
709        uri_extension = uri.pack_extension(params)
710        return len(uri_extension)
Note: See TracBrowser for help on using the repository browser.