Ticket #798: new-downloader-v3.diff

File new-downloader-v3.diff, 141.6 KB (added by warner, at 2010-04-26T09:53:14Z)

latest WIP patch, a few tests pass

  • src/allmydata/client.py

    diff --git a/src/allmydata/client.py b/src/allmydata/client.py
    index 12e7473..a1ed272 100644
    a b import allmydata 
    1212from allmydata.storage.server import StorageServer
    1313from allmydata import storage_client
    1414from allmydata.immutable.upload import Uploader
    15 from allmydata.immutable.download import Downloader
     15from allmydata.immutable.download2_util import Terminator
    1616from allmydata.immutable.offloaded import Helper
    1717from allmydata.control import ControlServer
    1818from allmydata.introducer.client import IntroducerClient
    class Client(node.Node, pollmixin.PollMixin): 
    278278
    279279        self.init_client_storage_broker()
    280280        self.history = History(self.stats_provider)
     281        self.terminator = Terminator()
     282        self.terminator.setServiceParent(self)
    281283        self.add_service(Uploader(helper_furl, self.stats_provider))
    282         download_cachedir = os.path.join(self.basedir,
    283                                          "private", "cache", "download")
    284         self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
    285         self.download_cache_dirman.setServiceParent(self)
    286         self.downloader = Downloader(self.storage_broker, self.stats_provider)
    287284        self.init_stub_client()
    288285        self.init_nodemaker()
    289286
    class Client(node.Node, pollmixin.PollMixin): 
    342339                                   self._secret_holder,
    343340                                   self.get_history(),
    344341                                   self.getServiceNamed("uploader"),
    345                                    self.downloader,
    346                                    self.download_cache_dirman,
     342                                   self.terminator,
    347343                                   self.get_encoding_parameters(),
    348344                                   self._key_generator)
    349345
  • new file src/allmydata/immutable/download2.py

    diff --git a/src/allmydata/immutable/download2.py b/src/allmydata/immutable/download2.py
    new file mode 100644
    index 0000000..83f8a47
    - +  
     1
     2import binascii
     3import struct
     4from zope.interface import implements
     5from twisted.python.failure import Failure
     6from twisted.internet import defer
     7from twisted.internet.interfaces import IPushProducer, IConsumer
     8
     9from foolscap.api import eventually
     10from allmydata.interfaces import HASH_SIZE, NotEnoughSharesError, \
     11     IImmutableFileNode
     12from allmydata.hashtree import IncompleteHashTree, BadHashError, \
     13     NotEnoughHashesError
     14from allmydata.util import base32, log, hashutil, mathutil, idlib
     15from allmydata.util.spans import Spans, DataSpans, overlap
     16from allmydata.util.dictutil import DictOfSets
     17from allmydata.codec import CRSDecoder
     18from allmydata import uri
     19from pycryptopp.cipher.aes import AES
     20from download2_util import Observer2, incidentally
     21from layout import make_write_bucket_proxy
     22
     23(AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM) = \
     24 ("AVAILABLE", "PENDING", "OVERDUE", "COMPLETE", "CORRUPT", "DEAD", "BADSEGNUM")
     25
     26KiB = 1024
     27class BadSegmentNumberError(Exception):
     28    pass
     29class BadSegmentError(Exception):
     30    pass
     31class BadCiphertextHashError(Exception):
     32    pass
     33
     34class Share:
     35    """I represent a single instance of a single share (e.g. I reference the
     36    shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
     37    I am associated with a CommonShare that remembers data that is held in
     38    common among e.g. SI=abcde/shnum2 across all servers. I am also
     39    associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all
     40    servers).
     41    """
     42    # this is a specific implementation of IShare for tahoe's native storage
     43    # servers. A different backend would use a different class.
     44
     45    def __init__(self, rref, server_version, verifycap, commonshare, node,
     46                 peerid, shnum):
     47        self._rref = rref
     48        self._server_version = server_version
     49        self._node = node # holds share_hash_tree and UEB
     50        self._guess_offsets(verifycap, node.guessed_segment_size)
     51        self.actual_offsets = None
     52        self.actual_segment_size = None
     53        self._UEB_length = None
     54        self._commonshare = commonshare # holds block_hash_tree
     55        self._peerid = peerid
     56        self._peerid_s = base32.b2a(peerid)[:5]
     57        self._storage_index = verifycap.storage_index
     58        self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
     59        self._shnum = shnum
     60
     61        self._lp = log.msg(format="Share(%(si)s) on server=%(server)s starting",
     62                           si=self._si_prefix, server=self._peerid_s,
     63                           level=log.NOISY, umid="P7hv2w")
     64
     65        self._wanted = Spans() # desired metadata
     66        self._wanted_blocks = Spans() # desired block data
     67        self._requested = Spans() # we've sent a request for this
     68        self._received = Spans() # we've received a response for this
     69        self._received_data = DataSpans() # the response contents, still unused
     70        self._requested_blocks = [] # (segnum, set(observer2..))
     71        ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"]
     72        self._overrun_ok = ver["tolerates-immutable-read-overrun"]
     73        # If _overrun_ok and we guess the offsets correctly, we can get
     74        # everything in one RTT. If _overrun_ok and we guess wrong, we might
     75        # need two RTT (but we could get lucky and do it in one). If overrun
     76        # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version,
     77        # 2=offset table, 3=UEB_length and everything else (hashes, block),
     78        # 4=UEB.
     79
     80    def _guess_offsets(self, verifycap, guessed_segment_size):
     81        self.guessed_segment_size = guessed_segment_size
     82        size = verifycap.size
     83        k = verifycap.needed_shares
     84        N = verifycap.total_shares
     85        r = self._node._calculate_sizes(guessed_segment_size)
     86        # num_segments, block_size/tail_block_size
     87        # guessed_segment_size/tail_segment_size/tail_segment_padded
     88        share_size = mathutil.div_ceil(size, k)
     89        # share_size is the amount of block data that will be put into each
     90        # share, summed over all segments. It does not include hashes, the
     91        # UEB, or other overhead.
     92
     93        # use the upload-side code to get this as accurate as possible
     94        ht = IncompleteHashTree(N)
     95        num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
     96        wbp = make_write_bucket_proxy(None, share_size, r["block_size"],
     97                                      r["num_segments"], num_share_hashes, 0,
     98                                      None)
     99        self._fieldsize = wbp.fieldsize
     100        self._fieldstruct = wbp.fieldstruct
     101        self.guessed_offsets = wbp._offsets
     102
     103    # called by our client, the SegmentFetcher
     104    def get_block(self, segnum):
     105        """Add a block number to the list of requests. This will eventually
     106        result in a fetch of the data necessary to validate the block, then
     107        the block itself. The fetch order is generally
     108        first-come-first-served, but requests may be answered out-of-order if
     109        data becomes available sooner.
     110
     111        I return an Observer2, which has two uses. The first is to call
     112        o.subscribe(), which gives me a place to send state changes and
     113        eventually the data block. The second is o.cancel(), which removes
     114        the request (if it is still active).
     115
     116        I will distribute the following events through my Observer2:
     117         - state=OVERDUE: ?? I believe I should have had an answer by now.
     118                          You may want to ask another share instead.
     119         - state=BADSEGNUM: the segnum you asked for is too large. I must
     120                            fetch a valid UEB before I can determine this,
     121                            so the notification is asynchronous
     122         - state=COMPLETE, block=data: here is a valid block
     123         - state=CORRUPT: this share contains corrupted data
     124         - state=DEAD, f=Failure: the server reported an error, this share
     125                                  is unusable
     126        """
     127        assert segnum >= 0
     128        o = Observer2()
     129        o.set_canceler(self._cancel_block_request)
     130        for i,(segnum0,observers) in enumerate(self._requested_blocks):
     131            if segnum0 == segnum:
     132                observers.add(o)
     133                break
     134        else:
     135            self._requested_blocks.append( (segnum, set([o])) )
     136        eventually(self.loop)
     137        return o
     138
     139    def _cancel_block_request(self, o):
     140        new_requests = []
     141        for e in self._requested_blocks:
     142            (segnum0, observers) = e
     143            observers.discard(o)
     144            if observers:
     145                new_requests.append(e)
     146        self._requested_blocks = new_requests
     147
     148    # internal methods
     149    def _active_segnum(self):
     150        if self._requested_blocks:
     151            return self._requested_blocks[0]
     152        return None
     153
     154    def _active_segnum_and_observers(self):
     155        if self._requested_blocks:
     156            # we only retrieve information for one segment at a time, to
     157            # minimize alacrity (first come, first served)
     158            return self._requested_blocks[0]
     159        return None, []
     160
     161    def loop(self):
     162        try:
     163            # if any exceptions occur here, kill the download
     164            self._do_loop()
     165        except BaseException:
     166            self._fail(Failure())
     167            raise
     168
     169    def _do_loop(self):
     170        # we are (eventually) called after all state transitions:
     171        #  new segments added to self._requested_blocks
     172        #  new data received from servers (responses to our read() calls)
     173        #  impatience timer fires (server appears slow)
     174
     175        # First, consume all of the information that we currently have, for
     176        # all the segments people currently want.
     177        while self._get_satisfaction():
     178            pass
     179
     180        # When we get no satisfaction (from the data we've received so far),
     181        # we determine what data we desire (to satisfy more requests). The
     182        # number of segments is finite, so I can't get no satisfaction
     183        # forever.
     184        self._desire()
     185
     186        # finally send out requests for whatever we need (desire minus have).
     187        # You can't always get what you want, but, sometimes, you get what
     188        # you need.
     189        self._request_needed() # express desire
     190
     191    def _get_satisfaction(self):
     192        # return True if we retired a data block, and should therefore be
     193        # called again. Return False if we don't retire a data block (even if
     194        # we do retire some other data, like hash chains).
     195
     196        if self.actual_offsets is None:
     197            if not self._satisfy_offsets():
     198                # can't even look at anything without the offset table
     199                return False
     200
     201        if not self._node.have_UEB:
     202            if not self._satisfy_UEB():
     203                # can't check any hashes without the UEB
     204                return False
     205
     206        # knowing the UEB means knowing num_segments. Despite the redundancy,
     207        # this is the best place to set this. CommonShare.set_numsegs will
     208        # ignore duplicate calls.
     209        cs = self._commonshare
     210        cs.set_numsegs(self._node.num_segments)
     211
     212        segnum, observers = self._active_segnum_and_observers()
     213        if segnum >= self._node.num_segments:
     214            for o in observers:
     215                o.notify(state=BADSEGNUM)
     216            self._requested_blocks.pop(0)
     217            return True
     218
     219        if self._node.share_hash_tree.needed_hashes(self._shnum):
     220            if not self._satisfy_share_hash_tree():
     221                # can't check block_hash_tree without a root
     222                return False
     223
     224        if cs.need_block_hash_root():
     225            block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
     226            cs.set_block_hash_root(block_hash_root)
     227
     228        if segnum is None:
     229            return False # we don't want any particular segment right now
     230
     231        # block_hash_tree
     232        needed_hashes = self._commonshare.get_needed_block_hashes(segnum)
     233        if needed_hashes:
     234            if not self._satisfy_block_hash_tree(needed_hashes):
     235                # can't check block without block_hash_tree
     236                return False
     237
     238        # data blocks
     239        return self._satisfy_data_block(segnum, observers)
     240
     241    def _satisfy_offsets(self):
     242        version_s = self._received_data.get(0, 4)
     243        if version_s is None:
     244            return False
     245        (version,) = struct.unpack(">L", version_s)
     246        if version == 1:
     247            table_start = 0x0c
     248            self._fieldsize = 0x4
     249            self._fieldstruct = "L"
     250        else:
     251            table_start = 0x14
     252            self._fieldsize = 0x8
     253            self._fieldstruct = "Q"
     254        offset_table_size = 6 * self._fieldsize
     255        table_s = self._received_data.pop(table_start, offset_table_size)
     256        if table_s is None:
     257            return False
     258        fields = struct.unpack(">"+6*self._fieldstruct, table_s)
     259        offsets = {}
     260        for i,field in enumerate(['data',
     261                                  'plaintext_hash_tree', # UNUSED
     262                                  'crypttext_hash_tree',
     263                                  'block_hashes',
     264                                  'share_hashes',
     265                                  'uri_extension',
     266                                  ] ):
     267            offsets[field] = fields[i]
     268        self.actual_offsets = offsets
     269        log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields))
     270        self._received_data.remove(0, 4) # don't need this anymore
     271        return True
     272
     273    def _satisfy_UEB(self):
     274        o = self.actual_offsets
     275        fsize = self._fieldsize
     276        rdata = self._received_data
     277        UEB_length_s = rdata.get(o["uri_extension"], fsize)
     278        if not UEB_length_s:
     279            return False
     280        (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
     281        UEB_s = rdata.pop(o["uri_extension"]+fsize, UEB_length)
     282        if not UEB_s:
     283            return False
     284        rdata.remove(o["uri_extension"], fsize)
     285        try:
     286            self._node.validate_and_store_UEB(UEB_s)
     287            self.actual_segment_size = self._node.segment_size
     288            assert self.actual_segment_size is not None
     289            return True
     290        except BadHashError:
     291            # TODO: if this UEB was bad, we'll keep trying to validate it
     292            # over and over again. Only log.err on the first one, or better
     293            # yet skip all but the first
     294            f = Failure()
     295            self._signal_corruption(f, o["uri_extension"], fsize+UEB_length)
     296            return False
     297
     298    def _satisfy_share_hash_tree(self):
     299        # the share hash chain is stored as (hashnum,hash) tuples, so you
     300        # can't fetch just the pieces you need, because you don't know
     301        # exactly where they are. So fetch everything, and parse the results
     302        # later.
     303        o = self.actual_offsets
     304        rdata = self._received_data
     305        hashlen = o["uri_extension"] - o["share_hashes"]
     306        assert hashlen % (2+HASH_SIZE) == 0
     307        hashdata = rdata.get(o["share_hashes"], hashlen)
     308        if not hashdata:
     309            return False
     310        share_hashes = {}
     311        for i in range(0, hashlen, 2+HASH_SIZE):
     312            (hashnum,) = struct.unpack(">H", hashdata[i:i+2])
     313            hashvalue = hashdata[i+2:i+2+HASH_SIZE]
     314            share_hashes[hashnum] = hashvalue
     315        try:
     316            self._node.process_share_hashes(share_hashes)
     317            # adds to self._node.share_hash_tree
     318            rdata.remove(o["share_hashes"], hashlen)
     319            return True
     320        except (BadHashError, NotEnoughHashesError, IndexError):
     321            f = Failure()
     322            self._signal_corruption(f, o["share_hashes"], hashlen)
     323            return False
     324
     325    def _signal_corruption(self, f, start, offset):
     326        # there was corruption somewhere in the given range
     327        reason = "corruption in share[%d-%d): %s" % (start, start+offset,
     328                                                     str(f.value))
     329        self._rref.callRemoteOnly("advise_corrupt_share", "immutable",
     330                                  self._storage_index, self._shnum, reason)
     331
     332    def _satisfy_block_hash_tree(self, needed_hashes):
     333        o = self.actual_offsets
     334        rdata = self._received_data
     335        block_hashes = {}
     336        for hashnum in needed_hashes:
     337            hashdata = rdata.get(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
     338            if hashdata:
     339                block_hashes[hashnum] = hashdata
     340            else:
     341                return False # missing some hashes
     342        # note that we don't submit any hashes to the block_hash_tree until
     343        # we've gotten them all, because the hash tree will throw an
     344        # exception if we only give it a partial set (which it therefore
     345        # cannot validate)
     346        commonshare = self._commonshare
     347        ok = commonshare.process_block_hashes(block_hashes, self._peerid_s)
     348        if not ok:
     349            return False
     350        for hashnum in needed_hashes:
     351            rdata.remove(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
     352        return True
     353
     354    def _satisfy_data_block(self, segnum, observers):
     355        tail = (segnum == self._node.num_segments-1)
     356        datastart = self.actual_offsets["data"]
     357        blockstart = datastart + segnum * self._node.block_size
     358        blocklen = self._node.block_size
     359        if tail:
     360            blocklen = self._node.tail_block_size
     361
     362        rdata = self._received_data
     363        block = rdata.pop(blockstart, blocklen)
     364        if not block:
     365            return False
     366        # this block is being retired, either as COMPLETE or CORRUPT, since
     367        # no further data reads will help
     368        assert self._requested_blocks[0][0] == segnum
     369        commonshare = self._commonshare
     370        ok = commonshare.check_block(segnum, block, self._peerid_s)
     371        if ok:
     372            for o in observers:
     373                # goes to SegmentFetcher._block_request_activity
     374                o.notify(state=COMPLETE, block=block)
     375        else:
     376            for o in observers:
     377                o.notify(state=CORRUPT)
     378        self._requested_blocks.pop(0) # retired
     379        return True # got satisfaction
     380
     381    def _desire(self):
     382        segnum, observers = self._active_segnum_and_observers()
     383        commonshare = self._commonshare
     384
     385        if not self.actual_offsets:
     386            self._desire_offsets()
     387
     388        # we can use guessed offsets as long as this server tolerates overrun
     389        if not self.actual_offsets and not self._overrun_ok:
     390            return # must wait for the offsets to arrive
     391
     392        o = self.actual_offsets or self.guessed_offsets
     393        segsize = self.actual_segment_size or self.guessed_segment_size
     394        if not self._node.have_UEB:
     395            self._desire_UEB(o)
     396
     397        if self._node.share_hash_tree.needed_hashes(self._shnum):
     398            hashlen = o["uri_extension"] - o["share_hashes"]
     399            self._wanted.add(o["share_hashes"], hashlen)
     400
     401        if segnum is None:
     402            return # only need block hashes or blocks for active segments
     403
     404        # block hash chain
     405        for hashnum in commonshare.get_needed_block_hashes(segnum):
     406            self._wanted.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
     407
     408        # data
     409        r = self._node._calculate_sizes(segsize)
     410        tail = (segnum == r["num_segments"])
     411        datastart = o["data"]
     412        blockstart = datastart + segnum * r["block_size"]
     413        blocklen = r["block_size"]
     414        if tail:
     415            blocklen = r["tail_block_size"]
     416        self._wanted_blocks.add(blockstart, blocklen)
     417
     418
     419    def _desire_offsets(self):
     420        if self._overrun_ok:
     421            # easy! this includes version number, sizes, and offsets
     422            self._wanted.add(0,1024)
     423            return
     424
     425        # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44).
     426        # To be conservative, only request the data that we know lives there,
     427        # even if that means more roundtrips.
     428
     429        self._wanted.add(0,4)  # version number, always safe
     430        version_s = self._received_data.get(0, 4)
     431        if not version_s:
     432            return
     433        (version,) = struct.unpack(">L", version_s)
     434        if version == 1:
     435            table_start = 0x0c
     436            fieldsize = 0x4
     437        else:
     438            table_start = 0x14
     439            fieldsize = 0x8
     440        offset_table_size = 6 * fieldsize
     441        self._wanted.add(table_start, offset_table_size)
     442
     443    def _desire_UEB(self, o):
     444        # UEB data is stored as (length,data).
     445        rdata = self._received_data
     446        if self._overrun_ok:
     447            # We can pre-fetch 2kb, which should probably cover it. If it
     448            # turns out to be larger, we'll come back here later with a known
     449            # length and fetch the rest.
     450            self._wanted.add(o["uri_extension"], 2048)
     451            # now, while that is probably enough to fetch the whole UEB, it
     452            # might not be, so we need to do the next few steps as well. In
     453            # most cases, the following steps will not actually add anything
     454            # to self._wanted
     455
     456        self._wanted.add(o["uri_extension"], self._fieldsize)
     457        # only use a length if we're sure it's correct, otherwise we'll
     458        # probably fetch a huge number
     459        if not self.actual_offsets:
     460            return
     461        UEB_length_s = rdata.get(o["uri_extension"], self._fieldsize)
     462        if UEB_length_s:
     463            (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
     464            # we know the length, so make sure we grab everything
     465            self._wanted.add(o["uri_extension"]+self._fieldsize, UEB_length)
     466
     467    def _request_needed(self):
     468        # send requests for metadata first, to avoid hanging on to large data
     469        # blocks any longer than necessary.
     470        self._send_requests(self._wanted - self._received - self._requested)
     471        # then send requests for data blocks. All the hashes should arrive
     472        # before the blocks, so the blocks can be consumed and released in a
     473        # single turn.
     474        ask = self._wanted_blocks - self._received - self._requested
     475        self._send_requests(ask)
     476
     477    def _send_requests(self, needed):
     478        for (start, length) in needed:
     479            # TODO: quantize to reasonably-large blocks
     480            self._requested.add(start, length)
     481            lp = log.msg(format="_send_request(%(peerid)s)"
     482                         " [%(start)d:+%(length)d]",
     483                         peerid=self._peerid_s, start=start, length=length,
     484                         level=log.NOISY, umid="sgVAyA")
     485            d = self._send_request(start, length)
     486            d.addCallback(self._got_data, start, length, lp)
     487            d.addErrback(self._got_error, start, length, lp)
     488            d.addCallback(incidentally, eventually, self.loop)
     489            d.addErrback(lambda f:
     490                         log.err(format="unhandled error during send_request",
     491                                 failure=f, parent=self._lp,
     492                                 level=log.WEIRD, umid="qZu0wg"))
     493
     494    def _send_request(self, start, length):
     495        return self._rref.callRemote("read", start, length)
     496
     497    def _got_data(self, data, start, length, lp):
     498        log.msg(format="_got_data [%(start)d:+%(length)d] -> %(datalen)d",
     499                start=start, length=length, datalen=len(data),
     500                level=log.NOISY, parent=lp, umid="sgVAyA")
     501        span = (start, length)
     502        assert span in self._requested
     503        self._requested.remove(start, length)
     504        self._received.add(start, length)
     505        self._received_data.add(start, data)
     506
     507    def _got_error(self, f, start, length, lp):
     508        log.msg(format="error requesting %(start)d+%(length)d"
     509                " from %(server)s for si %(si)s",
     510                start=start, length=length,
     511                server=self._peerid_s, si=self._si_prefix,
     512                failure=f, parent=lp, level=log.UNUSUAL, umid="qZu0wg")
     513        # retire our observers, assuming we won't be able to make any
     514        # further progress
     515        self._fail(f)
     516
     517    def _fail(self, f):
     518        for (segnum, observers) in self._requested_blocks:
     519            for o in observers:
     520                o.notify(state=DEAD, f=f)
     521
     522
     523class CommonShare:
     524    """I hold data that is common across all instances of a single share,
     525    like sh2 on both servers A and B. This is just the block hash tree.
     526    """
     527    def __init__(self, guessed_numsegs, si_prefix, shnum):
     528        self.si_prefix = si_prefix
     529        self.shnum = shnum
     530        # in the beginning, before we have the real UEB, we can only guess at
     531        # the number of segments. But we want to ask for block hashes early.
     532        # So if we're asked for which block hashes are needed before we know
     533        # numsegs for sure, we return a guess.
     534        self._block_hash_tree = IncompleteHashTree(guessed_numsegs)
     535        self._know_numsegs = False
     536
     537    def set_numsegs(self, numsegs):
     538        if self._know_numsegs:
     539            return
     540        self._block_hash_tree = IncompleteHashTree(numsegs)
     541        self._know_numsegs = True
     542
     543    def need_block_hash_root(self):
     544        log.msg("need_block_hash_root: %s" % bool(not self._block_hash_tree[0]))
     545        return bool(not self._block_hash_tree[0])
     546
     547    def set_block_hash_root(self, roothash):
     548        log.msg("set_block_hash_root: %s" % repr(roothash))
     549        self._block_hash_tree.set_hashes({0: roothash})
     550        log.msg("done with set_block_hash_root")
     551
     552    def get_needed_block_hashes(self, segnum):
     553        needed = ",".join([str(n) for n in sorted(self._block_hash_tree.needed_hashes(segnum))])
     554        log.msg("segnum=%d needs %s" % (segnum, needed))
     555        # XXX: include_leaf=True needs thought: how did the old downloader do
     556        # it? I think it grabbed *all* block hashes and set them all at once.
     557        # Since we want to fetch less data, we either need to fetch the leaf
     558        # too, or wait to set the block hashes until we've also received the
     559        # block itself, so we can hash it too, and set the chain+leaf all at
     560        # the same time.
     561        return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
     562
     563    def process_block_hashes(self, block_hashes, serverid_s):
     564        assert self._know_numsegs
     565        try:
     566            self._block_hash_tree.set_hashes(block_hashes)
     567            return True
     568        except (BadHashError, NotEnoughHashesError):
     569            hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())])
     570            log.msg(format="hash failure in block_hashes=(%(hashnums)s),"
     571                    " shnum=%(shnum)d SI=%(si)s server=%(server)s",
     572                    hashnums=hashnums, shnum=self.shnum,
     573                    si=self.si_prefix, server=serverid_s, failure=Failure(),
     574                    level=log.WEIRD, umid="yNyFdA")
     575        return False
     576
     577    def check_block(self, segnum, block, serverid_s):
     578        assert self._know_numsegs
     579        h = hashutil.block_hash(block)
     580        try:
     581            self._block_hash_tree.set_hashes(leaves={segnum: h})
     582            return True
     583        except (BadHashError, NotEnoughHashesError):
     584            self.log(format="hash failure in block %(segnum)d,"
     585                     " shnum=%(shnum)d SI=%(si)s server=%(server)s",
     586                     segnum=segnum, shnum=self.shnum, si=self.si_prefix,
     587                     server=serverid_s, failure=Failure(),
     588                     level=log.WEIRD, umid="mZjkqA")
     589        return False
     590
     591# all classes are also Services, and the rule is that you don't initiate more
     592# work unless self.running
     593
     594# GC: decide whether each service is restartable or not. For non-restartable
     595# services, stopService() should delete a lot of attributes to kill reference
     596# cycles. The primary goal is to decref remote storage BucketReaders when a
     597# download is complete.
     598
     599class SegmentFetcher:
     600    """I am responsible for acquiring blocks for a single segment. I will use
     601    the Share instances passed to my add_shares() method to locate, retrieve,
     602    and validate those blocks. I expect my parent node to call my
     603    no_more_shares() method when there are no more shares available. I will
     604    call my parent's want_more_shares() method when I want more: I expect to
     605    see at least one call to add_shares or no_more_shares afterwards.
     606
     607    When I have enough validated blocks, I will call my parent's
     608    process_blocks() method with a dictionary that maps shnum to blockdata.
     609    If I am unable to provide enough blocks, I will call my parent's
     610    fetch_failed() method with (self, f). After either of these events, I
     611    will shut down and do no further work. My parent can also call my stop()
     612    method to have me shut down early."""
     613
     614    def __init__(self, node, segnum, k):
     615        self._node = node # _Node
     616        self.segnum = segnum
     617        self._k = k
     618        self._shares = {} # maps non-dead Share instance to a state, one of
     619                          # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT).
     620                          # State transition map is:
     621                          #  AVAILABLE -(send-read)-> PENDING
     622                          #  PENDING -(timer)-> OVERDUE
     623                          #  PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
     624                          #  OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
     625                          # If a share becomes DEAD, it is removed from the
     626                          # dict. If it becomes BADSEGNUM, the whole fetch is
     627                          # terminated.
     628        self._share_observers = {} # maps Share to Observer2 for active ones
     629        self._shnums = DictOfSets() # maps shnum to the shares that provide it
     630        self._blocks = {} # maps shnum to validated block data
     631        self._no_more_shares = False
     632        self._bad_segnum = False
     633        self._last_failure = None
     634        self._running = True
     635
     636    def stop(self):
     637        self._cancel_all_requests()
     638        self._running = False
     639        del self._shares # let GC work # ??? XXX
     640
     641
     642    # called by our parent _Node
     643
     644    def add_shares(self, shares):
     645        # called when ShareFinder locates a new share, and when a non-initial
     646        # segment fetch is started and we already know about shares from the
     647        # previous segment
     648        for s in shares:
     649            self._shares[s] = AVAILABLE
     650            self._shnums.add(s._shnum, s)
     651        eventually(self.loop)
     652
     653    def no_more_shares(self):
     654        # ShareFinder tells us it's reached the end of its list
     655        self._no_more_shares = True
     656        eventually(self.loop)
     657
     658    # internal methods
     659
     660    def _count_shnums(self, *states):
     661        """shnums for which at least one state is in the following list"""
     662        shnums = []
     663        for shnum,shares in self._shnums.iteritems():
     664            matches = [s for s in shares if self._shares[s] in states]
     665            if matches:
     666                shnums.append(shnum)
     667        return len(shnums)
     668
     669    def loop(self):
     670        try:
     671            # if any exception occurs here, kill the download
     672            self._do_loop()
     673        except BaseException:
     674            self._node.fetch_failed(self, Failure())
     675            raise
     676
     677    def _do_loop(self):
     678        k = self._k
     679        if not self._running:
     680            return
     681        if self._bad_segnum:
     682            # oops, we were asking for a segment number beyond the end of the
     683            # file. This is an error.
     684            self.stop()
     685            e = BadSegmentNumberError("%d > %d" % (self.segnum,
     686                                                   self._node.num_segments))
     687            f = Failure(e)
     688            self._node.fetch_failed(self, f)
     689            return
     690
     691        # are we done?
     692        if self._count_shnums(COMPLETE) >= k:
     693            # yay!
     694            self.stop()
     695            self._node.process_blocks(self.segnum, self._blocks)
     696            return
     697
     698        # we may have exhausted everything
     699        if (self._no_more_shares and
     700            self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k):
     701            # no more new shares are coming, and the remaining hopeful shares
     702            # aren't going to be enough. boo!
     703            self.stop()
     704            format = ("ran out of shares: %(complete)d complete,"
     705                      " %(pending)d pending, %(overdue)d overdue,"
     706                      " %(unused)d unused, need %(k)d."
     707                      " Last failure: %(last_failure)s")
     708            args = {"complete": self._count_shnums(COMPLETE),
     709                    "pending": self._count_shnums(PENDING),
     710                    "overdue": self._count_shnums(OVERDUE),
     711                    "unused": self._count_shnums(AVAILABLE), # should be zero
     712                    "k": k,
     713                    "last_failure": self._last_failure,
     714                    }
     715            log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args)
     716            e = NotEnoughSharesError(format % args)
     717            f = Failure(e)
     718            self._node.fetch_failed(self, f)
     719            return
     720
     721        # nope, not done. Are we "block-hungry" (i.e. do we want to send out
     722        # more read requests, or do we think we have enough in flight
     723        # already?)
     724        while self._count_shnums(PENDING, COMPLETE) < k:
     725            # we're hungry.. are there any unused shares?
     726            sent = self._send_new_request()
     727            if not sent:
     728                break
     729
     730        # ok, now are we "share-hungry" (i.e. do we have enough known shares
     731        # to make us happy, or should we ask the ShareFinder to get us more?)
     732        if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k:
     733            # we're hungry for more shares
     734            self._node.want_more_shares()
     735            # that will trigger the ShareFinder to keep looking
     736
     737    def _find_one(self, shares, state):
     738        # TODO could choose fastest
     739        for s in shares:
     740            if self._shares[s] == state:
     741                return s
     742        raise IndexError("shouldn't get here")
     743
     744    def _send_new_request(self):
     745        for shnum,shares in self._shnums.iteritems():
     746            states = [self._shares[s] for s in shares]
     747            if COMPLETE in states or PENDING in states:
     748                # don't send redundant requests
     749                continue
     750            if AVAILABLE not in states:
     751                # no candidates for this shnum, move on
     752                continue
     753            # here's a candidate. Send a request.
     754            s = self._find_one(shares, AVAILABLE)
     755            self._shares[s] = PENDING
     756            self._share_observers[s] = o = s.get_block(self.segnum)
     757            o.subscribe(self._block_request_activity, share=s, shnum=shnum)
     758            # TODO: build up a list of candidates, then walk through the
     759            # list, sending requests to the most desireable servers,
     760            # re-checking our block-hunger each time. For non-initial segment
     761            # fetches, this would let us stick with faster servers.
     762            return True
     763        # nothing was sent: don't call us again until you have more shares to
     764        # work with, or one of the existing shares has been declared OVERDUE
     765        return False
     766
     767    def _cancel_all_requests(self):
     768        for o in self._share_observers.values():
     769            o.cancel()
     770        self._share_observers = {}
     771
     772    def _block_request_activity(self, share, shnum, state, block=None, f=None):
     773        # called by Shares, in response to our s.send_request() calls.
     774        # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal.
     775        if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
     776            del self._share_observers[share]
     777        if state is COMPLETE:
     778            # 'block' is fully validated
     779            self._shares[share] = COMPLETE
     780            self._blocks[shnum] = block
     781        elif state is OVERDUE:
     782            self._shares[share] = OVERDUE
     783            # OVERDUE is not terminal: it will eventually transition to
     784            # COMPLETE, CORRUPT, or DEAD.
     785        elif state is CORRUPT:
     786            self._shares[share] = CORRUPT
     787        elif state is DEAD:
     788            del self._shares[share]
     789            self._shnums[shnum].remove(share)
     790            self._last_failure = f
     791        elif state is BADSEGNUM:
     792            self._shares[share] = BADSEGNUM # ???
     793            self._bad_segnum = True
     794        eventually(self.loop)
     795
     796
     797class RequestToken:
     798    def __init__(self, peerid):
     799        self.peerid = peerid
     800
     801class ShareFinder:
     802    def __init__(self, storage_broker, verifycap, node,
     803                 max_outstanding_requests=10):
     804        self.running = True
     805        self.verifycap = verifycap
     806        s = storage_broker.get_servers_for_index(verifycap.storage_index)
     807        self._servers = iter(s)
     808        self.share_consumer = self.node = node
     809        self.max_outstanding_requests = max_outstanding_requests
     810
     811        self._hungry = False
     812
     813        self._commonshares = {} # shnum to CommonShare instance
     814        self.undelivered_shares = []
     815        self.pending_requests = set()
     816
     817        self._storage_index = verifycap.storage_index
     818        self._si_prefix = base32.b2a_l(self._storage_index[:8], 60)
     819        self._lp = log.msg(format="ShareFinder[si=%(si)s] starting",
     820                           si=self._si_prefix, level=log.NOISY, umid="2xjj2A")
     821
     822    def log(self, *args, **kwargs):
     823        if "parent" not in kwargs:
     824            kwargs["parent"] = self._lp
     825        return log.msg(*args, **kwargs)
     826
     827    def stop(self):
     828        self.running = False
     829
     830    # called by our parent CiphertextDownloader
     831    def hungry(self):
     832        log.msg(format="ShareFinder[si=%(si)s] hungry",
     833                si=self._si_prefix, level=log.NOISY, umid="NywYaQ")
     834        self._hungry = True
     835        eventually(self.loop)
     836
     837    # internal methods
     838    def loop(self):
     839        undelivered_s = ",".join(["sh%d@%s" %
     840                                  (s._shnum, idlib.shortnodeid_b2a(s._peerid))
     841                                  for s in self.undelivered_shares])
     842        pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid)
     843                              for rt in self.pending_requests]) # sort?
     844        log.msg(format="ShareFinder[si=%(si)s] loop: running=%(running)s"
     845                " hungry=%(hungry)s, undelivered=%(undelivered)s,"
     846                " pending=%(pending)s",
     847                si=self._si_prefix, running=self.running, hungry=self._hungry,
     848                undelivered=undelivered_s, pending=pending_s,
     849                level=log.NOISY, umid="kRtS4Q")
     850        if not self.running:
     851            return
     852        if not self._hungry:
     853            return
     854        if self.undelivered_shares:
     855            sh = self.undelivered_shares.pop(0)
     856            # they will call hungry() again if they want more
     857            self._hungry = False
     858            eventually(self.share_consumer.got_shares, [sh])
     859            return
     860        if len(self.pending_requests) >= self.max_outstanding_requests:
     861            # cannot send more requests, must wait for some to retire
     862            return
     863
     864        server = None
     865        try:
     866            if self._servers:
     867                server = self._servers.next()
     868        except StopIteration:
     869            self._servers = None
     870
     871        if server:
     872            self.send_request(server)
     873            return
     874
     875        if self.pending_requests:
     876            # no server, but there are still requests in flight: maybe one of
     877            # them will make progress
     878            return
     879
     880        log.msg(format="ShareFinder.loop: no_more_shares",
     881                level=log.UNUSUAL, umid="XjQlzg")
     882        # we've run out of servers (so we can't send any more requests), and
     883        # we have nothing in flight. No further progress can be made. They
     884        # are destined to remain hungry.
     885        self.share_consumer.no_more_shares()
     886        self.stop()
     887
     888    def send_request(self, server):
     889        peerid, rref = server
     890        req = RequestToken(peerid)
     891        self.pending_requests.add(req)
     892        lp = self.log(format="sending DYHB to [%(peerid)s]",
     893                      peerid=idlib.shortnodeid_b2a(peerid),
     894                      level=log.NOISY, umid="Io7pyg")
     895        d = rref.callRemote("get_buckets", self._storage_index)
     896        d.addBoth(incidentally, self.pending_requests.discard, req)
     897        d.addCallbacks(self._got_response, self._got_error,
     898                       callbackArgs=(rref.version, peerid, req, lp),
     899                       errbackArgs=(peerid, req, lp))
     900        d.addErrback(log.err, format="error in send_request",
     901                     level=log.WEIRD, parent=lp, umid="rpdV0w")
     902        d.addCallback(incidentally, eventually, self.loop)
     903
     904    def _got_response(self, buckets, server_version, peerid, req, lp):
     905        if buckets:
     906            shnums_s = ",".join([str(shnum) for shnum in buckets])
     907            self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
     908                     shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
     909                     level=log.NOISY, parent=lp, umid="0fcEZw")
     910        else:
     911            self.log(format="no shares from [%(peerid)s]",
     912                     peerid=idlib.shortnodeid_b2a(peerid),
     913                     level=log.NOISY, parent=lp, umid="U7d4JA")
     914        if self.node.num_segments is None:
     915            best_numsegs = self.node.guessed_num_segments
     916        else:
     917            best_numsegs = self.node.num_segments
     918        for shnum, bucket in buckets.iteritems():
     919            if shnum in self._commonshares:
     920                cs = self._commonshares[shnum]
     921            else:
     922                cs = CommonShare(best_numsegs, self._si_prefix, shnum)
     923                # Share._get_satisfaction is responsible for updating
     924                # CommonShare.set_numsegs after we know the UEB. Alternatives:
     925                #  1: d = self.node.get_num_segments()
     926                #     d.addCallback(cs.got_numsegs)
     927                #   the problem is that the OneShotObserverList I was using
     928                #   inserts an eventual-send between _get_satisfaction's
     929                #   _satisfy_UEB and _satisfy_block_hash_tree, and the
     930                #   CommonShare didn't get the num_segs message before
     931                #   being asked to set block hash values. To resolve this
     932                #   would require an immediate ObserverList instead of
     933                #   an eventual-send -based one
     934                #  2: break _get_satisfaction into Deferred-attached pieces.
     935                #     Yuck.
     936                self._commonshares[shnum] = cs
     937            s = Share(bucket, server_version, self.verifycap, cs, self.node,
     938                      peerid, shnum)
     939            self.undelivered_shares.append(s)
     940
     941    def _got_error(self, f, peerid, req, lp):
     942        self.log(format="got error from [%(peerid)s]",
     943                 peerid=idlib.shortnodeid_b2a(peerid), failure=f,
     944                 level=log.UNUSUAL, parent=lp, umid="zUKdCw")
     945
     946
     947
     948class Segmentation:
     949    """I am responsible for a single offset+size read of the file. I handle
     950    segmentation: I figure out which segments are necessary, request them
     951    (from my CiphertextDownloader) in order, and trim the segments down to
     952    match the offset+size span. I use the Producer/Consumer interface to only
     953    request one segment at a time.
     954    """
     955    implements(IPushProducer)
     956    def __init__(self, node, offset, size, consumer):
     957        self._node = node
     958        self._hungry = True
     959        self._active_segnum = None
     960        self._cancel_segment_request = None
     961        # these are updated as we deliver data. At any given time, we still
     962        # want to download file[offset:offset+size]
     963        self._offset = offset
     964        self._size = size
     965        self._consumer = consumer
     966
     967    def start(self):
     968        self._alive = True
     969        self._deferred = defer.Deferred()
     970        self._consumer.registerProducer(self, True)
     971        self._maybe_fetch_next()
     972        return self._deferred
     973
     974    def _maybe_fetch_next(self):
     975        if not self._alive or not self._hungry:
     976            return
     977        if self._active_segnum is not None:
     978            return
     979        self._fetch_next()
     980
     981    def _fetch_next(self):
     982        if self._size == 0:
     983            # done!
     984            self._alive = False
     985            self._hungry = False
     986            self._consumer.unregisterProducer()
     987            self._deferred.callback(self._consumer)
     988            return
     989        n = self._node
     990        have_actual_segment_size = n.segment_size is not None
     991        segment_size = n.segment_size or n.guessed_segment_size
     992        if self._offset == 0:
     993            # great! we want segment0 for sure
     994            wanted_segnum = 0
     995        else:
     996            # this might be a guess
     997            wanted_segnum = self._offset // segment_size
     998        self._active_segnum = wanted_segnum
     999        d,c = n.get_segment(wanted_segnum)
     1000        self._cancel_segment_request = c
     1001        d.addBoth(self._request_retired)
     1002        d.addCallback(self._got_segment, have_actual_segment_size)
     1003        d.addErrback(self._retry_bad_segment, have_actual_segment_size)
     1004        d.addErrback(self._error)
     1005
     1006    def _request_retired(self, res):
     1007        self._active_segnum = None
     1008        self._cancel_segment_request = None
     1009        return res
     1010
     1011    def _got_segment(self, (segment_start,segment), had_actual_segment_size):
     1012        segnum = self._active_segnum
     1013        self._active_segnum = None
     1014        self._cancel_segment_request = None
     1015        # we got file[segment_start:segment_start+len(segment)]
     1016        # we want file[self._offset:self._offset+self._size]
     1017        o = overlap(segment_start, len(segment),  self._offset, self._size)
     1018        # the overlap is file[o[0]:o[0]+o[1]]
     1019        if not o or o[0] != self._offset:
     1020            # we didn't get the first byte, so we can't use this segment
     1021            if self._node.segment_size is not None:
     1022                # and we should have gotten it right. This is big problem.
     1023                log.msg("Segmentation handed wrong data (but we knew better):"
     1024                        " want [%d-%d), given [%d-%d), for segnum=%d,"
     1025                        " for si=%s"
     1026                        % (self._offset, self._offset+self._size,
     1027                           segment_start, segment_start+len(segment),
     1028                           segnum, self._node._si_prefix),
     1029                        level=log.WEIRD, umid="STlIiA")
     1030                raise BadSegmentError("Despite knowing the segment size,"
     1031                                      " we were given the wrong data."
     1032                                      " I cannot cope.")
     1033            # we've wasted some bandwidth, but now we can grab the right one,
     1034            # because we should know the segsize by now.
     1035            assert self._node.segment_size is not None
     1036            self._maybe_fetch_next()
     1037            return
     1038        offset_in_segment = self._offset - segment_start
     1039        desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
     1040
     1041        self._offset += len(desired_data)
     1042        self._size -= len(desired_data)
     1043        self._consumer.write(desired_data)
     1044        # the consumer might call our .pauseProducing() inside that write()
     1045        # call, setting self._hungry=False
     1046        self._maybe_fetch_next()
     1047
     1048    def _retry_bad_segment(self, f, had_actual_segment_size):
     1049        f.trap(BadSegmentNumberError) # guessed way wrong, off the end
     1050        if had_actual_segment_size:
     1051            # but we should have known better, so this is a real error
     1052            return f
     1053        # we didn't know better: try again with more information
     1054        return self._maybe_fetch_next()
     1055
     1056    def _error(self, f):
     1057        self._alive = False
     1058        self._hungry = False
     1059        self._consumer.unregisterProducer()
     1060        self._deferred.errback(f)
     1061
     1062    def stopProducing(self):
     1063        self._hungry = False
     1064        self._alive = False
     1065        # cancel any outstanding segment request
     1066        if self._cancel_segment_request:
     1067            self._cancel_segment_request()
     1068            self._cancel_segment_request = None
     1069    def pauseProducing(self):
     1070        self._hungry = False
     1071    def resumeProducing(self):
     1072        self._hungry = True
     1073        eventually(self._maybe_fetch_next)
     1074
     1075class Cancel:
     1076    def __init__(self, f):
     1077        self._f = f
     1078        self.cancelled = False
     1079    def cancel(self):
     1080        if not self.cancelled:
     1081            self.cancelled = True
     1082            self._f(self)
     1083
     1084class _Node:
     1085    """Internal class which manages downloads and holds state. External
     1086    callers use CiphertextFileNode instead."""
     1087
     1088    # Share._node points to me
     1089    def __init__(self, verifycap, storage_broker, secret_holder,
     1090                 terminator, history):
     1091        assert isinstance(verifycap, uri.CHKFileVerifierURI)
     1092        self._verifycap = verifycap
     1093        self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60)
     1094        self.running = True
     1095        terminator.register(self) # calls self.stop() at stopService()
     1096        # the rules are:
     1097        # 1: Only send network requests if you're active (self.running is True)
     1098        # 2: Use TimerService, not reactor.callLater
     1099        # 3: You can do eventual-sends any time.
     1100        # These rules should mean that once
     1101        # stopService()+flushEventualQueue() fires, everything will be done.
     1102        self._secret_holder = secret_holder
     1103        self._history = history
     1104
     1105        k, N = self._verifycap.needed_shares, self._verifycap.total_shares
     1106        self.share_hash_tree = IncompleteHashTree(N)
     1107
     1108        # we guess the segment size, so Segmentation can pull non-initial
     1109        # segments in a single roundtrip
     1110        max_segment_size = 128*KiB # TODO: pull from elsewhere, maybe the
     1111                                   # same place as upload.BaseUploadable
     1112        s = mathutil.next_multiple(min(verifycap.size, max_segment_size), k)
     1113        self.guessed_segment_size = s
     1114        r = self._calculate_sizes(self.guessed_segment_size)
     1115        self.guessed_num_segments = r["num_segments"]
     1116
     1117        # filled in when we parse a valid UEB
     1118        self.have_UEB = False
     1119        self.segment_size = None
     1120        self.tail_segment_size = None
     1121        self.tail_segment_padded = None
     1122        self.num_segments = None
     1123        self.block_size = None
     1124        self.tail_block_size = None
     1125        self.ciphertext_hash_tree = None # size depends on num_segments
     1126        self.ciphertext_hash = None # flat hash, optional
     1127
     1128        # things to track callers that want data
     1129
     1130        # _segment_requests can have duplicates
     1131        self._segment_requests = [] # (segnum, d, cancel_handle)
     1132        self._active_segment = None # a SegmentFetcher, with .segnum
     1133
     1134        self._sharefinder = ShareFinder(storage_broker, verifycap, self)
     1135        self._shares = set()
     1136
     1137    def stop(self):
     1138        # called by the Terminator at shutdown, mostly for tests
     1139        if self._active_segment:
     1140            self._active_segment.stop()
     1141            self._active_segment = None
     1142        self._sharefinder.stop()
     1143
     1144    # things called by outside callers, via CiphertextFileNode. get_segment()
     1145    # may also be called by Segmentation.
     1146
     1147    def read(self, consumer, offset=0, size=None):
     1148        """I am the main entry point, from which FileNode.read() can get
     1149        data. I feed the consumer with the desired range of ciphertext. I
     1150        return a Deferred that fires (with the consumer) when the read is
     1151        finished."""
     1152        # for concurrent operations: each gets its own Segmentation manager
     1153        if size is None:
     1154            size = self._verifycap.size - offset
     1155        log.msg(format="imm Node(%(si)s.read(%(offset)d, %(size)d)",
     1156                si=base32.b2a(self._verifycap.storage_index)[:8],
     1157                offset=offset, size=size,
     1158                level=log.OPERATIONAL, umid="l3j3Ww")
     1159        s = Segmentation(self, offset, size, consumer)
     1160        # this raises an interesting question: what segments to fetch? if
     1161        # offset=0, always fetch the first segment, and then allow
     1162        # Segmentation to be responsible for pulling the subsequent ones if
     1163        # the first wasn't large enough. If offset>0, we're going to need an
     1164        # extra roundtrip to get the UEB (and therefore the segment size)
     1165        # before we can figure out which segment to get. TODO: allow the
     1166        # offset-table-guessing code (which starts by guessing the segsize)
     1167        # to assist the offset>0 process.
     1168        d = s.start()
     1169        return d
     1170
     1171    def get_segment(self, segnum):
     1172        """Begin downloading a segment. I return a tuple (d, c): 'd' is a
     1173        Deferred that fires with (offset,data) when the desired segment is
     1174        available, and c is an object on which c.cancel() can be called to
     1175        disavow interest in the segment (after which 'd' will never fire).
     1176
     1177        You probably need to know the segment size before calling this,
     1178        unless you want the first few bytes of the file. If you ask for a
     1179        segment number which turns out to be too large, the Deferred will
     1180        errback with BadSegmentNumberError.
     1181
     1182        The Deferred fires with the offset of the first byte of the data
     1183        segment, so that you can call get_segment() before knowing the
     1184        segment size, and still know which data you received.
     1185
     1186        The Deferred can also errback with other fatal problems, such as
     1187        NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
     1188        """
     1189        log.msg(format="imm Node(%(si)s.get_segment(%(segnum)d)",
     1190                si=base32.b2a(self._verifycap.storage_index)[:8],
     1191                segnum=segnum, level=log.OPERATIONAL, umid="UKFjDQ")
     1192        d = defer.Deferred()
     1193        c = Cancel(self._cancel_request)
     1194        self._segment_requests.append( (segnum, d, c) )
     1195        self._start_new_segment()
     1196        return (d, c)
     1197
     1198    # things called by the Segmentation object used to transform
     1199    # arbitrary-sized read() calls into quantized segment fetches
     1200
     1201    def _start_new_segment(self):
     1202        if self._active_segment is None and self._segment_requests:
     1203            segnum = self._segment_requests[0][0]
     1204            k = self._verifycap.needed_shares
     1205            self._active_segment = fetcher = SegmentFetcher(self, segnum, k)
     1206            active_shares = [s for s in self._shares if s.not_dead()]
     1207            fetcher.add_shares(active_shares) # this triggers the loop
     1208
     1209
     1210    # called by our child ShareFinder
     1211    def got_shares(self, shares):
     1212        self._shares.update(shares)
     1213        if self._active_segment:
     1214            self._active_segment.add_shares(shares)
     1215    def no_more_shares(self):
     1216        self._no_more_shares = True
     1217        if self._active_segment:
     1218            self._active_segment.no_more_shares()
     1219
     1220    # things called by our Share instances
     1221
     1222    def validate_and_store_UEB(self, UEB_s):
     1223        log.msg("validate_and_store_UEB",
     1224                level=log.OPERATIONAL, umid="7sTrPw")
     1225        h = hashutil.uri_extension_hash(UEB_s)
     1226        if h != self._verifycap.uri_extension_hash:
     1227            raise hashutil.BadHashError
     1228        UEB_dict = uri.unpack_extension(UEB_s)
     1229        self._parse_and_store_UEB(UEB_dict) # sets self._stuff
     1230        # TODO: a malformed (but authentic) UEB could throw an assertion in
     1231        # _parse_and_store_UEB, and we should abandon the download.
     1232        self.have_UEB = True
     1233
     1234    def _parse_and_store_UEB(self, d):
     1235        # Note: the UEB contains needed_shares and total_shares. These are
     1236        # redundant and inferior (the filecap contains the authoritative
     1237        # values). However, because it is possible to encode the same file in
     1238        # multiple ways, and the encoders might choose (poorly) to use the
     1239        # same key for both (therefore getting the same SI), we might
     1240        # encounter shares for both types. The UEB hashes will be different,
     1241        # however, and we'll disregard the "other" encoding's shares as
     1242        # corrupted.
     1243
     1244        # therefore, we ignore d['total_shares'] and d['needed_shares'].
     1245
     1246        log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
     1247                ueb=repr(d), vcap=self._verifycap.to_string(),
     1248                level=log.NOISY, umid="cVqZnA")
     1249
     1250        k, N = self._verifycap.needed_shares, self._verifycap.total_shares
     1251
     1252        self.segment_size = d['segment_size']
     1253
     1254        r = self._calculate_sizes(self.segment_size)
     1255        self.tail_segment_size = r["tail_segment_size"]
     1256        self.tail_segment_padded = r["tail_segment_padded"]
     1257        self.num_segments = r["num_segments"]
     1258        self.block_size = r["block_size"]
     1259        self.tail_block_size = r["tail_block_size"]
     1260        log.msg("actual sizes: %s" % (r,),
     1261                level=log.NOISY, umid="PY6P5Q")
     1262
     1263        # zfec.Decode() instantiation is fast, but still, let's use the same
     1264        # codec instance for all but the last segment. 3-of-10 takes 15us on
     1265        # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
     1266        # 2.5ms, worst-case 254-of-255 is 9.3ms
     1267        self._codec = CRSDecoder()
     1268        self._codec.set_params(self.segment_size, k, N)
     1269
     1270
     1271        # Ciphertext hash tree root is mandatory, so that there is at most
     1272        # one ciphertext that matches this read-cap or verify-cap. The
     1273        # integrity check on the shares is not sufficient to prevent the
     1274        # original encoder from creating some shares of file A and other
     1275        # shares of file B.
     1276        self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
     1277        self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
     1278
     1279        self.share_hash_tree.set_hashes({0: d['share_root_hash']})
     1280
     1281        # crypttext_hash is optional. We only pull this from the first UEB
     1282        # that we see.
     1283        if 'crypttext_hash' in d:
     1284            if len(d["crypttext_hash"]) == hashutil.CRYPTO_VAL_SIZE:
     1285                self.ciphertext_hash = d['crypttext_hash']
     1286            else:
     1287                log.msg("ignoring bad-length UEB[crypttext_hash], "
     1288                        "got %d bytes, want %d" % (len(d['crypttext_hash']),
     1289                                                   hashutil.CRYPTO_VAL_SIZE),
     1290                        umid="oZkGLA", level=log.WEIRD)
     1291
     1292        # Our job is a fast download, not verification, so we ignore any
     1293        # redundant fields. The Verifier uses a different code path which
     1294        # does not ignore them.
     1295
     1296    def _calculate_sizes(self, segment_size):
     1297        # segments of ciphertext
     1298        size = self._verifycap.size
     1299        k = self._verifycap.needed_shares
     1300
     1301        # this assert matches the one in encode.py:127 inside
     1302        # Encoded._got_all_encoding_parameters, where the UEB is constructed
     1303        assert segment_size % k == 0
     1304
     1305        # the last segment is usually short. We don't store a whole segsize,
     1306        # but we do pad the segment up to a multiple of k, because the
     1307        # encoder requires that.
     1308        tail_segment_size = size % segment_size
     1309        if tail_segment_size == 0:
     1310            tail_segment_size = segment_size
     1311        padded = mathutil.next_multiple(tail_segment_size, k)
     1312        tail_segment_padded = padded
     1313
     1314        num_segments = mathutil.div_ceil(size, segment_size)
     1315
     1316        # each segment is turned into N blocks. All but the last are of size
     1317        # block_size, and the last is of size tail_block_size
     1318        block_size = segment_size / k
     1319        tail_block_size = tail_segment_padded / k
     1320
     1321        return { "tail_segment_size": tail_segment_size,
     1322                 "tail_segment_padded": tail_segment_padded,
     1323                 "num_segments": num_segments,
     1324                 "block_size": block_size,
     1325                 "tail_block_size": tail_block_size,
     1326                 }
     1327
     1328
     1329    def process_share_hashes(self, share_hashes):
     1330        self.share_hash_tree.set_hashes(share_hashes)
     1331
     1332    # called by our child SegmentFetcher
     1333
     1334    def want_more_shares(self):
     1335        self._sharefinder.hungry()
     1336
     1337    def fetch_failed(self, sf, f):
     1338        assert sf is self._active_segment
     1339        self._active_segment = None
     1340        # deliver error upwards
     1341        for (d,c) in self._extract_requests(sf.segnum):
     1342            eventually(self._deliver, d, c, f)
     1343
     1344    def process_blocks(self, segnum, blocks):
     1345        d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
     1346        d.addCallback(self._check_ciphertext_hash, segnum)
     1347        def _deliver(result):
     1348            for (d,c) in self._extract_requests(segnum):
     1349                eventually(self._deliver, d, c, result)
     1350            self._active_segment = None
     1351            self._start_new_segment()
     1352        d.addBoth(_deliver)
     1353        d.addErrback(lambda f:
     1354                     log.err(format="unhandled error during process_blocks",
     1355                             failure=f, level=log.WEIRD, umid="MkEsCg"))
     1356
     1357    def _decode_blocks(self, segnum, blocks):
     1358        tail = (segnum == self.num_segments-1)
     1359        codec = self._codec
     1360        block_size = self.block_size
     1361        decoded_size = self.segment_size
     1362        if tail:
     1363            # account for the padding in the last segment
     1364            codec = CRSDecoder()
     1365            k, N = self._verifycap.needed_shares, self._verifycap.total_shares
     1366            codec.set_params(self.tail_segment_padded, k, N)
     1367            block_size = self.tail_block_size
     1368            decoded_size = self.tail_segment_padded
     1369
     1370        shares = []
     1371        shareids = []
     1372        for (shareid, share) in blocks.iteritems():
     1373            assert len(share) == block_size
     1374            shareids.append(shareid)
     1375            shares.append(share)
     1376        del blocks
     1377
     1378        d = codec.decode(shares, shareids)   # segment
     1379        del shares
     1380        def _process(buffers):
     1381            segment = "".join(buffers)
     1382            assert len(segment) == decoded_size
     1383            del buffers
     1384            if tail:
     1385                segment = segment[:self.tail_segment_size]
     1386            return segment
     1387        d.addCallback(_process)
     1388        return d
     1389
     1390    def _check_ciphertext_hash(self, segment, segnum):
     1391        assert self._active_segment.segnum == segnum
     1392        assert self.segment_size is not None
     1393        offset = segnum * self.segment_size
     1394
     1395        h = hashutil.crypttext_segment_hash(segment)
     1396        try:
     1397            self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
     1398            return (offset, segment)
     1399        except (BadHashError, NotEnoughHashesError):
     1400            format = ("hash failure in ciphertext_hash_tree:"
     1401                      " segnum=%(segnum)d, SI=%(si)s")
     1402            log.msg(format=format, segnum=segnum, si=self._si_prefix,
     1403                    failure=Failure(), level=log.WEIRD, umid="MTwNnw")
     1404            # this is especially weird, because we made it past the share
     1405            # hash tree. It implies that we're using the wrong encoding, or
     1406            # that the uploader deliberately constructed a bad UEB.
     1407            msg = format % {"segnum": segnum, "si": self._si_prefix}
     1408            raise BadCiphertextHashError(msg)
     1409
     1410    def _deliver(self, d, c, result):
     1411        # this method exists to handle cancel() that occurs between
     1412        # _got_segment and _deliver
     1413        if not c.cancelled:
     1414            d.callback(result) # might actually be an errback
     1415
     1416    def _extract_requests(self, segnum):
     1417        """Remove matching requests and return their (d,c) tuples so that the
     1418        caller can retire them."""
     1419        retire = [(d,c) for (segnum0, d, c) in self._segment_requests
     1420                  if segnum0 == segnum]
     1421        self._segment_requests = [t for t in self._segment_requests
     1422                                  if t[0] != segnum]
     1423        return retire
     1424
     1425    def _cancel_request(self, c):
     1426        self._segment_requests = [t for t in self._segment_requests
     1427                                  if t[2] != c]
     1428        segnums = [segnum for (segnum,d,c) in self._segment_requests]
     1429        if self._active_segment.segnum not in segnums:
     1430            self._active_segment.stop()
     1431            self._active_segment = None
     1432            self._start_new_segment()
     1433
     1434class CiphertextFileNode:
     1435    def __init__(self, verifycap, storage_broker, secret_holder,
     1436                 terminator, history):
     1437        assert isinstance(verifycap, uri.CHKFileVerifierURI)
     1438        self._node = _Node(verifycap, storage_broker, secret_holder,
     1439                           terminator, history)
     1440
     1441    def read(self, consumer, offset=0, size=None):
     1442        """I am the main entry point, from which FileNode.read() can get
     1443        data. I feed the consumer with the desired range of ciphertext. I
     1444        return a Deferred that fires (with the consumer) when the read is
     1445        finished."""
     1446        return self._node.read(consumer, offset, size)
     1447
     1448    def get_segment(self, segnum):
     1449        """Begin downloading a segment. I return a tuple (d, c): 'd' is a
     1450        Deferred that fires with (offset,data) when the desired segment is
     1451        available, and c is an object on which c.cancel() can be called to
     1452        disavow interest in the segment (after which 'd' will never fire).
     1453
     1454        You probably need to know the segment size before calling this,
     1455        unless you want the first few bytes of the file. If you ask for a
     1456        segment number which turns out to be too large, the Deferred will
     1457        errback with BadSegmentNumberError.
     1458
     1459        The Deferred fires with the offset of the first byte of the data
     1460        segment, so that you can call get_segment() before knowing the
     1461        segment size, and still know which data you received.
     1462        """
     1463        return self._node.get_segment(segnum)
     1464
     1465    def raise_error(self):
     1466        pass
     1467
     1468
     1469class DecryptingConsumer:
     1470    """I sit between a CiphertextDownloader (which acts as a Producer) and
     1471    the real Consumer, decrypting everything that passes by. The real
     1472    Consumer sees the real Producer, but the Producer sees us instead of the
     1473    real consumer."""
     1474    implements(IConsumer)
     1475
     1476    def __init__(self, consumer, readkey, offset):
     1477        self._consumer = consumer
     1478        # TODO: pycryptopp CTR-mode needs random-access operations: I want
     1479        # either a=AES(readkey, offset) or better yet both of:
     1480        #  a=AES(readkey, offset=0)
     1481        #  a.process(ciphertext, offset=xyz)
     1482        # For now, we fake it with the existing iv= argument.
     1483        offset_big = offset // 16
     1484        offset_small = offset % 16
     1485        iv = binascii.unhexlify("%032x" % offset_big)
     1486        self._decryptor = AES(readkey, iv=iv)
     1487        self._decryptor.process("\x00"*offset_small)
     1488
     1489    def registerProducer(self, producer, streaming):
     1490        # this passes through, so the real consumer can flow-control the real
     1491        # producer. Therefore we don't need to provide any IPushProducer
     1492        # methods. We implement all the IConsumer methods as pass-throughs,
     1493        # and only intercept write() to perform decryption.
     1494        self._consumer.registerProducer(producer, streaming)
     1495    def unregisterProducer(self):
     1496        self._consumer.unregisterProducer()
     1497    def write(self, ciphertext):
     1498        plaintext = self._decryptor.process(ciphertext)
     1499        self._consumer.write(plaintext)
     1500
     1501class ImmutableFileNode:
     1502    implements(IImmutableFileNode)
     1503
     1504    # I wrap a CiphertextFileNode with a decryption key
     1505    def __init__(self, filecap, storage_broker, secret_holder, terminator,
     1506                 history):
     1507        assert isinstance(filecap, uri.CHKFileURI)
     1508        verifycap = filecap.get_verify_cap()
     1509        self._cnode = CiphertextFileNode(verifycap, storage_broker,
     1510                                         secret_holder, terminator, history)
     1511        assert isinstance(filecap, uri.CHKFileURI)
     1512        self.u = filecap
     1513        self._readkey = filecap.key
     1514
     1515    def read(self, consumer, offset=0, size=None):
     1516        decryptor = DecryptingConsumer(consumer, self._readkey, offset)
     1517        d = self._cnode.read(decryptor, offset, size)
     1518        d.addCallback(lambda dc: consumer)
     1519        return d
     1520
     1521    def raise_error(self):
     1522        pass
     1523
     1524    def get_write_uri(self):
     1525        return None
     1526
     1527    def get_readonly_uri(self):
     1528        return self.get_uri()
     1529
     1530    def get_uri(self):
     1531        return self.u.to_string()
     1532    def get_cap(self):
     1533        return self.u
     1534    def get_readcap(self):
     1535        return self.u.get_readonly()
     1536    def get_verify_cap(self):
     1537        return self.u.get_verify_cap()
     1538    def get_repair_cap(self):
     1539        # CHK files can be repaired with just the verifycap
     1540        return self.u.get_verify_cap()
     1541
     1542    def get_storage_index(self):
     1543        return self.u.get_storage_index()
     1544
     1545    def get_size(self):
     1546        return self.u.get_size()
     1547    def get_current_size(self):
     1548        return defer.succeed(self.get_size())
     1549
     1550    def is_mutable(self):
     1551        return False
     1552
     1553    def is_readonly(self):
     1554        return True
     1555
     1556    def is_unknown(self):
     1557        return False
     1558
     1559    def is_allowed_in_immutable_directory(self):
     1560        return True
     1561
     1562
     1563# TODO: if server1 has all shares, and server2-10 have one each, make the
     1564# loop stall slightly before requesting all shares from the first server, to
     1565# give it a chance to learn about the other shares and get some diversity.
     1566# Or, don't bother, let the first block all come from one server, and take
     1567# comfort in the fact that we'll learn about the other servers by the time we
     1568# fetch the second block.
     1569#
     1570# davidsarah points out that we could use sequential (instead of parallel)
     1571# fetching of multiple block from a single server: by the time the first
     1572# block arrives, we'll hopefully have heard about other shares. This would
     1573# induce some RTT delays (i.e. lose pipelining) in the case that this server
     1574# has the only shares, but that seems tolerable. We could rig it to only use
     1575# sequential requests on the first segment.
     1576
     1577# as a query gets later, we're more willing to duplicate work.
     1578
     1579# should change server read protocol to allow small shares to be fetched in a
     1580# single RTT. Instead of get_buckets-then-read, just use read(shnums, readv),
     1581# where shnums=[] means all shares, and the return value is a dict of
     1582# # shnum->ta (like with mutable files). The DYHB query should also fetch the
     1583# offset table, since everything else can be located once we have that.
     1584
     1585
     1586# ImmutableFileNode
     1587#    DecryptingConsumer
     1588#  CiphertextFileNode
     1589#    Segmentation
     1590#   ShareFinder
     1591#   SegmentFetcher[segnum] (one at a time)
     1592#   CommonShare[shnum]
     1593#   Share[shnum,server]
     1594
     1595# TODO: when we learn numsegs, any get_segment() calls for bad blocknumbers
     1596# should be failed with BadSegmentNumberError. But should this be the
     1597# responsibility of CiphertextFileNode, or SegmentFetcher? The knowledge will
     1598# first appear when a Share receives a valid UEB and calls
     1599# CiphertextFileNode.validate_UEB, then _parse_UEB. The SegmentFetcher is
     1600# expecting to hear from the Share, via the _block_request_activity observer.
     1601
     1602# make it the responsibility of the SegmentFetcher. Each Share that gets a
     1603# valid UEB will tell the SegmentFetcher BADSEGNUM (instead of COMPLETE or
     1604# CORRUPT). The SegmentFetcher it then responsible for shutting down, and
     1605# informing its parent (the CiphertextFileNode) of the BadSegmentNumberError,
     1606# which is then passed to the client of get_segment().
     1607
     1608
     1609# TODO: if offset table is corrupt, attacker could cause us to fetch whole
     1610# (large) share
     1611
     1612# log budget: when downloading at 1MBps (i.e. 8 segments-per-second), 10
     1613# log.OPERATIONAL per second, 100 log.NOISY per second. With k=3, that's 3
     1614# log.NOISY per block fetch.
  • new file src/allmydata/immutable/download2_off.py

    diff --git a/src/allmydata/immutable/download2_off.py b/src/allmydata/immutable/download2_off.py
    new file mode 100755
    index 0000000..d2b8b99
    - +  
     1#! /usr/bin/python
     2
     3# known (shnum,Server) pairs are sorted into a list according to
     4# desireability. This sort is picking a winding path through a matrix of
     5# [shnum][server]. The goal is to get diversity of both shnum and server.
     6
     7# The initial order is:
     8#  find the lowest shnum on the first server, add it
     9#  look at the next server, find the lowest shnum that we don't already have
     10#   if any
     11#  next server, etc, until all known servers are checked
     12#  now look at servers that we skipped (because ...
     13
     14# Keep track of which block requests are outstanding by (shnum,Server). Don't
     15# bother prioritizing "validated" shares: the overhead to pull the share hash
     16# chain is tiny (4 hashes = 128 bytes), and the overhead to pull a new block
     17# hash chain is also tiny (1GB file, 8192 segments of 128KiB each, 13 hashes,
     18# 832 bytes). Each time a block request is sent, also request any necessary
     19# hashes. Don't bother with a "ValidatedShare" class (as distinct from some
     20# other sort of Share). Don't bother avoiding duplicate hash-chain requests.
     21
     22# For each outstanding segread, walk the list and send requests (skipping
     23# outstanding shnums) until requests for k distinct shnums are in flight. If
     24# we can't do that, ask for more. If we get impatient on a request, find the
     25# first non-outstanding
     26
     27# start with the first Share in the list, and send a request. Then look at
     28# the next one. If we already have a pending request for the same shnum or
     29# server, push that Share down onto the fallback list and try the next one,
     30# etc. If we run out of non-fallback shares, use the fallback ones,
     31# preferring shnums that we don't have outstanding requests for (i.e. assume
     32# that all requests will complete). Do this by having a second fallback list.
     33
     34# hell, I'm reviving the Herder. But remember, we're still talking 3 objects
     35# per file, not thousands.
     36
     37# actually, don't bother sorting the initial list. Append Shares as the
     38# responses come back, that will put the fastest servers at the front of the
     39# list, and give a tiny preference to servers that are earlier in the
     40# permuted order.
     41
     42# more ideas:
     43#  sort shares by:
     44#   1: number of roundtrips needed to get some data
     45#   2: share number
     46#   3: ms of RTT delay
     47# maybe measure average time-to-completion of requests, compare completion
     48# time against that, much larger indicates congestion on the server side
     49# or the server's upstream speed is less than our downstream. Minimum
     50# time-to-completion indicates min(our-downstream,their-upstream). Could
     51# fetch shares one-at-a-time to measure that better.
     52
     53# when should we risk duplicate work and send a new request?
     54
     55def walk(self):
     56    shares = sorted(list)
     57    oldshares = copy(shares)
     58    outstanding = list()
     59    fallbacks = list()
     60    second_fallbacks = list()
     61    while len(outstanding.nonlate.shnums) < k: # need more requests
     62        while oldshares:
     63            s = shares.pop(0)
     64            if s.server in outstanding.servers or s.shnum in outstanding.shnums:
     65                fallbacks.append(s)
     66                continue
     67            outstanding.append(s)
     68            send_request(s)
     69            break #'while need_more_requests'
     70        # must use fallback list. Ask for more servers while we're at it.
     71        ask_for_more_servers()
     72        while fallbacks:
     73            s = fallbacks.pop(0)
     74            if s.shnum in outstanding.shnums:
     75                # assume that the outstanding requests will complete, but
     76                # send new requests for other shnums to existing servers
     77                second_fallbacks.append(s)
     78                continue
     79            outstanding.append(s)
     80            send_request(s)
     81            break #'while need_more_requests'
     82        # if we get here, we're being forced to send out multiple queries per
     83        # share. We've already asked for more servers, which might help. If
     84        # there are no late outstanding queries, then duplicate shares won't
     85        # help. Don't send queries for duplicate shares until some of the
     86        # queries are late.
     87        if outstanding.late:
     88            # we're allowed to try any non-outstanding share
     89            while second_fallbacks:
     90                pass
     91    newshares = outstanding + fallbacks + second_fallbacks + oldshares
     92       
     93
     94class Server:
     95    """I represent an abstract Storage Server. One day, the StorageBroker
     96    will return instances of me. For now, the StorageBroker returns (peerid,
     97    RemoteReference) tuples, and this code wraps a Server instance around
     98    them.
     99    """
     100    def __init__(self, peerid, ss):
     101        self.peerid = peerid
     102        self.remote = ss
     103        self._remote_buckets = {} # maps shnum to RIBucketReader
     104        # TODO: release the bucket references on shares that we no longer
     105        # want. OTOH, why would we not want them? Corruption?
     106
     107    def send_query(self, storage_index):
     108        """I return a Deferred that fires with a set of shnums. If the server
     109        had shares available, I will retain the RemoteReferences to its
     110        buckets, so that get_data(shnum, range) can be called later."""
     111        d = self.remote.callRemote("get_buckets", self.storage_index)
     112        d.addCallback(self._got_response)
     113        return d
     114
     115    def _got_response(self, r):
     116        self._remote_buckets = r
     117        return set(r.keys())
     118
     119class ShareOnAServer:
     120    """I represent one instance of a share, known to live on a specific
     121    server. I am created every time a server responds affirmatively to a
     122    do-you-have-block query."""
     123
     124    def __init__(self, shnum, server):
     125        self._shnum = shnum
     126        self._server = server
     127        self._block_hash_tree = None
     128
     129    def cost(self, segnum):
     130        """I return a tuple of (roundtrips, bytes, rtt), indicating how
     131        expensive I think it would be to fetch the given segment. Roundtrips
     132        indicates how many roundtrips it is likely to take (one to get the
     133        data and hashes, plus one to get the offset table and UEB if this is
     134        the first segment we've ever fetched). 'bytes' is how many bytes we
     135        must fetch (estimated). 'rtt' is estimated round-trip time (float) in
     136        seconds for a trivial request. The downloading algorithm will compare
     137        costs to decide which shares should be used."""
     138        # the most significant factor here is roundtrips: a Share for which
     139        # we already have the offset table is better to than a brand new one
     140
     141    def max_bandwidth(self):
     142        """Return a float, indicating the highest plausible bytes-per-second
     143        that I've observed coming from this share. This will be based upon
     144        the minimum (bytes-per-fetch / time-per-fetch) ever observed. This
     145        can we used to estimate the server's upstream bandwidth. Clearly this
     146        is only accurate if a share is retrieved with no contention for
     147        either the upstream, downstream, or middle of the connection, but it
     148        may still serve as a useful metric for deciding which servers to pull
     149        from."""
     150
     151    def get_segment(self, segnum):
     152        """I return a Deferred that will fire with the segment data, or
     153        errback."""
     154
     155class NativeShareOnAServer(ShareOnAServer):
     156    """For tahoe native (foolscap) servers, I contain a RemoteReference to
     157    the RIBucketReader instance."""
     158    def __init__(self, shnum, server, rref):
     159        ShareOnAServer.__init__(self, shnum, server)
     160        self._rref = rref # RIBucketReader
     161
     162class Share:
     163    def __init__(self, shnum):
     164        self._shnum = shnum
     165        # _servers are the Server instances which appear to hold a copy of
     166        # this share. It is populated when the ValidShare is first created,
     167        # or when we receive a get_buckets() response for a shnum that
     168        # already has a ValidShare instance. When we lose the connection to a
     169        # server, we remove it.
     170        self._servers = set()
     171        # offsets, UEB, and share_hash_tree all live in the parent.
     172        # block_hash_tree lives here.
     173        self._block_hash_tree = None
     174
     175        self._want
     176
     177    def get_servers(self):
     178        return self._servers
     179
     180
     181    def get_block(self, segnum):
     182        # read enough data to obtain a single validated block
     183        if not self.have_offsets:
     184            # we get the offsets in their own read, since they tell us where
     185            # everything else lives. We must fetch offsets for each share
     186            # separately, since they aren't directly covered by the UEB.
     187            pass
     188        if not self.parent.have_ueb:
     189            # use _guessed_segsize to make a guess about the layout, so we
     190            # can fetch both the offset table and the UEB in the same read.
     191            # This also requires making a guess about the presence or absence
     192            # of the plaintext_hash_tree. Oh, and also the version number. Oh
     193            # well.
     194            pass
     195
     196class CiphertextDownloader:
     197    """I manage all downloads for a single file. I operate a state machine
     198    with input events that are local read() requests, responses to my remote
     199    'get_bucket' and 'read_bucket' messages, and connection establishment and
     200    loss. My outbound events are connection establishment requests and bucket
     201    read requests messages.
     202    """
     203    # eventually this will merge into the FileNode
     204    ServerClass = Server # for tests to override
     205
     206    def __init__(self, storage_index, ueb_hash, size, k, N, storage_broker,
     207                 shutdowner):
     208        # values we get from the filecap
     209        self._storage_index = si = storage_index
     210        self._ueb_hash = ueb_hash
     211        self._size = size
     212        self._needed_shares = k
     213        self._total_shares = N
     214        self._share_hash_tree = IncompleteHashTree(self._total_shares)
     215        # values we discover when we first fetch the UEB
     216        self._ueb = None # is dict after UEB fetch+validate
     217        self._segsize = None
     218        self._numsegs = None
     219        self._blocksize = None
     220        self._tail_segsize = None
     221        self._ciphertext_hash = None # optional
     222        # structures we create when we fetch the UEB, then continue to fill
     223        # as we download the file
     224        self._share_hash_tree = None # is IncompleteHashTree after UEB fetch
     225        self._ciphertext_hash_tree = None
     226
     227        # values we learn as we download the file
     228        self._offsets = {} # (shnum,Server) to offset table (dict)
     229        self._block_hash_tree = {} # shnum to IncompleteHashTree
     230        # other things which help us
     231        self._guessed_segsize = min(128*1024, size)
     232        self._active_share_readers = {} # maps shnum to Reader instance
     233        self._share_readers = [] # sorted by preference, best first
     234        self._readers = set() # set of Reader instances
     235        self._recent_horizon = 10 # seconds
     236
     237        # 'shutdowner' is a MultiService parent used to cancel all downloads
     238        # when the node is shutting down, to let tests have a clean reactor.
     239
     240        self._init_available_servers()
     241        self._init_find_enough_shares()
     242
     243    # _available_servers is an iterator that provides us with Server
     244    # instances. Each time we pull out a Server, we immediately send it a
     245    # query, so we don't need to keep track of who we've sent queries to.
     246
     247    def _init_available_servers(self):
     248        self._available_servers = self._get_available_servers()
     249        self._no_more_available_servers = False
     250
     251    def _get_available_servers(self):
     252        """I am a generator of servers to use, sorted by the order in which
     253        we should query them. I make sure there are no duplicates in this
     254        list."""
     255        # TODO: make StorageBroker responsible for this non-duplication, and
     256        # replace this method with a simple iter(get_servers_for_index()),
     257        # plus a self._no_more_available_servers=True
     258        seen = set()
     259        sb = self._storage_broker
     260        for (peerid, ss) in sb.get_servers_for_index(self._storage_index):
     261            if peerid not in seen:
     262                yield self.ServerClass(peerid, ss) # Server(peerid, ss)
     263                seen.add(peerid)
     264        self._no_more_available_servers = True
     265
     266    # this block of code is responsible for having enough non-problematic
     267    # distinct shares/servers available and ready for download, and for
     268    # limiting the number of queries that are outstanding. The idea is that
     269    # we'll use the k fastest/best shares, and have the other ones in reserve
     270    # in case those servers stop responding or respond too slowly. We keep
     271    # track of all known shares, but we also keep track of problematic shares
     272    # (ones with hash failures or lost connections), so we can put them at
     273    # the bottom of the list.
     274
     275    def _init_find_enough_shares(self):
     276        # _unvalidated_sharemap maps shnum to set of Servers, and remembers
     277        # where viable (but not yet validated) shares are located. Each
     278        # get_bucket() response adds to this map, each act of validation
     279        # removes from it.
     280        self._sharemap = DictOfSets()
     281
     282        # _sharemap maps shnum to set of Servers, and remembers where viable
     283        # shares are located. Each get_bucket() response adds to this map,
     284        # each hash failure or disconnect removes from it. (TODO: if we
     285        # disconnect but reconnect later, we should be allowed to re-query).
     286        self._sharemap = DictOfSets()
     287
     288        # _problem_shares is a set of (shnum, Server) tuples, and
     289
     290        # _queries_in_flight maps a Server to a timestamp, which remembers
     291        # which servers we've sent queries to (and when) but have not yet
     292        # heard a response. This lets us put a limit on the number of
     293        # outstanding queries, to limit the size of the work window (how much
     294        # extra work we ask servers to do in the hopes of keeping our own
     295        # pipeline filled). We remove a Server from _queries_in_flight when
     296        # we get an answer/error or we finally give up. If we ever switch to
     297        # a non-connection-oriented protocol (like UDP, or forwarded Chord
     298        # queries), we can use this information to retransmit any query that
     299        # has gone unanswered for too long.
     300        self._queries_in_flight = dict()
     301
     302    def _count_recent_queries_in_flight(self):
     303        now = time.time()
     304        recent = now - self._recent_horizon
     305        return len([s for (s,when) in self._queries_in_flight.items()
     306                    if when > recent])
     307
     308    def _find_enough_shares(self):
     309        # goal: have 2*k distinct not-invalid shares available for reading,
     310        # from 2*k distinct servers. Do not have more than 4*k "recent"
     311        # queries in flight at a time.
     312        if (len(self._sharemap) >= 2*self._needed_shares
     313            and len(self._sharemap.values) >= 2*self._needed_shares):
     314            return
     315        num = self._count_recent_queries_in_flight()
     316        while num < 4*self._needed_shares:
     317            try:
     318                s = self._available_servers.next()
     319            except StopIteration:
     320                return # no more progress can be made
     321            self._queries_in_flight[s] = time.time()
     322            d = s.send_query(self._storage_index)
     323            d.addBoth(incidentally, self._queries_in_flight.discard, s)
     324            d.addCallbacks(lambda shnums: [self._sharemap.add(shnum, s)
     325                                           for shnum in shnums],
     326                           lambda f: self._query_error(f, s))
     327            d.addErrback(self._error)
     328            d.addCallback(self._reschedule)
     329            num += 1
     330
     331    def _query_error(self, f, s):
     332        # a server returned an error, log it gently and ignore
     333        level = log.WEIRD
     334        if f.check(DeadReferenceError):
     335            level = log.UNUSUAL
     336        log.msg("Error during get_buckets to server=%(server)s", server=str(s),
     337                failure=f, level=level, umid="3uuBUQ")
     338
     339    # this block is responsible for turning known shares into usable shares,
     340    # by fetching enough data to validate their contents.
     341
     342    # UEB (from any share)
     343    # share hash chain, validated (from any share, for given shnum)
     344    # block hash (any share, given shnum)
     345
     346    def _got_ueb(self, ueb_data, share):
     347        if self._ueb is not None:
     348            return
     349        if hashutil.uri_extension_hash(ueb_data) != self._ueb_hash:
     350            share.error("UEB hash does not match")
     351            return
     352        d = uri.unpack_extension(ueb_data)
     353        self.share_size = mathutil.div_ceil(self._size, self._needed_shares)
     354
     355
     356        # There are several kinds of things that can be found in a UEB.
     357        # First, things that we really need to learn from the UEB in order to
     358        # do this download. Next: things which are optional but not redundant
     359        # -- if they are present in the UEB they will get used. Next, things
     360        # that are optional and redundant. These things are required to be
     361        # consistent: they don't have to be in the UEB, but if they are in
     362        # the UEB then they will be checked for consistency with the
     363        # already-known facts, and if they are inconsistent then an exception
     364        # will be raised. These things aren't actually used -- they are just
     365        # tested for consistency and ignored. Finally: things which are
     366        # deprecated -- they ought not be in the UEB at all, and if they are
     367        # present then a warning will be logged but they are otherwise
     368        # ignored.
     369
     370        # First, things that we really need to learn from the UEB:
     371        # segment_size, crypttext_root_hash, and share_root_hash.
     372        self._segsize = d['segment_size']
     373
     374        self._blocksize = mathutil.div_ceil(self._segsize, self._needed_shares)
     375        self._numsegs = mathutil.div_ceil(self._size, self._segsize)
     376
     377        self._tail_segsize = self._size % self._segsize
     378        if self._tail_segsize == 0:
     379            self._tail_segsize = self._segsize
     380        # padding for erasure code
     381        self._tail_segsize = mathutil.next_multiple(self._tail_segsize,
     382                                                    self._needed_shares)
     383
     384        # Ciphertext hash tree root is mandatory, so that there is at most
     385        # one ciphertext that matches this read-cap or verify-cap. The
     386        # integrity check on the shares is not sufficient to prevent the
     387        # original encoder from creating some shares of file A and other
     388        # shares of file B.
     389        self._ciphertext_hash_tree = IncompleteHashTree(self._numsegs)
     390        self._ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
     391
     392        self._share_hash_tree.set_hashes({0: d['share_root_hash']})
     393
     394
     395        # Next: things that are optional and not redundant: crypttext_hash
     396        if 'crypttext_hash' in d:
     397            if len(self._ciphertext_hash) == hashutil.CRYPTO_VAL_SIZE:
     398                self._ciphertext_hash = d['crypttext_hash']
     399            else:
     400                log.msg("ignoring bad-length UEB[crypttext_hash], "
     401                        "got %d bytes, want %d" % (len(d['crypttext_hash']),
     402                                                   hashutil.CRYPTO_VAL_SIZE),
     403                        umid="oZkGLA", level=log.WEIRD)
     404
     405        # we ignore all of the redundant fields when downloading. The
     406        # Verifier uses a different code path which does not ignore them.
     407
     408        # finally, set self._ueb as a marker that we don't need to request it
     409        # anymore
     410        self._ueb = d
     411
     412    def _got_share_hashes(self, hashes, share):
     413        assert isinstance(hashes, dict)
     414        try:
     415            self._share_hash_tree.set_hashes(hashes)
     416        except (IndexError, BadHashError, NotEnoughHashesError), le:
     417            share.error("Bad or missing hashes")
     418            return
     419
     420    #def _got_block_hashes(
     421
     422    def _init_validate_enough_shares(self):
     423        # _valid_shares maps shnum to ValidatedShare instances, and is
     424        # populated once the block hash root has been fetched and validated
     425        # (which requires any valid copy of the UEB, and a valid copy of the
     426        # share hash chain for each shnum)
     427        self._valid_shares = {}
     428
     429        # _target_shares is an ordered list of ReadyShare instances, each of
     430        # which is a (shnum, server) tuple. It is sorted in order of
     431        # preference: we expect to get the fastest response from the
     432        # ReadyShares at the front of the list. It is also sorted to
     433        # distribute the shnums, so that fetching shares from
     434        # _target_shares[:k] is likely (but not guaranteed) to give us k
     435        # distinct shares. The rule is that we skip over entries for blocks
     436        # that we've already received, limit the number of recent queries for
     437        # the same block,
     438        self._target_shares = []
     439
     440    def _validate_enough_shares(self):
     441        # my goal is to have at least 2*k distinct validated shares from at
     442        # least 2*k distinct servers
     443        valid_share_servers = set()
     444        for vs in self._valid_shares.values():
     445            valid_share_servers.update(vs.get_servers())
     446        if (len(self._valid_shares) >= 2*self._needed_shares
     447            and len(self._valid_share_servers) >= 2*self._needed_shares):
     448            return
     449        #for
     450
     451    def _reschedule(self, _ign):
     452        # fire the loop again
     453        if not self._scheduled:
     454            self._scheduled = True
     455            eventually(self._loop)
     456
     457    def _loop(self):
     458        self._scheduled = False
     459        # what do we need?
     460
     461        self._find_enough_shares()
     462        self._validate_enough_shares()
     463
     464        if not self._ueb:
     465            # we always need a copy of the UEB
     466            pass
     467
     468    def _error(self, f):
     469        # this is an unexpected error: a coding bug
     470        log.err(f, level=log.UNUSUAL)
     471           
     472
     473
     474# using a single packed string (and an offset table) may be an artifact of
     475# our native storage server: other backends might allow cheap multi-part
     476# files (think S3, several buckets per share, one for each section).
     477
     478# find new names for:
     479#  data_holder
     480#  Share / Share2  (ShareInstance / Share? but the first is more useful)
     481
     482class IShare(Interface):
     483    """I represent a single instance of a single share (e.g. I reference the
     484    shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
     485    This interface is used by SegmentFetcher to retrieve validated blocks.
     486    """
     487    def get_block(segnum):
     488        """Return an Observer2, which will be notified with the following
     489        events:
     490         state=COMPLETE, block=data (terminal): validated block data
     491         state=OVERDUE (non-terminal): we have reason to believe that the
     492                                       request might have stalled, or we
     493                                       might just be impatient
     494         state=CORRUPT (terminal): the data we received was corrupt
     495         state=DEAD (terminal): the connection has failed
     496        """
     497
     498
     499# it'd be nice if we receive the hashes before the block, or just
     500# afterwards, so we aren't stuck holding on to unvalidated blocks
     501# that we can't process. If we guess the offsets right, we can
     502# accomplish this by sending the block request after the metadata
     503# requests (by keeping two separate requestlists), and have a one RTT
     504# pipeline like:
     505#  1a=metadata, 1b=block
     506#  1b->process+deliver : one RTT
     507
     508# But if we guess wrong, and fetch the wrong part of the block, we'll
     509# have a pipeline that looks like:
     510#  1a=wrong metadata, 1b=wrong block
     511#  1a->2a=right metadata,2b=right block
     512#  2b->process+deliver
     513# which means two RTT and buffering one block (which, since we'll
     514# guess the segsize wrong for everything, means buffering one
     515# segment)
     516
     517# if we start asking for multiple segments, we could get something
     518# worse:
     519#  1a=wrong metadata, 1b=wrong block0, 1c=wrong block1, ..
     520#  1a->2a=right metadata,2b=right block0,2c=right block1, .
     521#  2b->process+deliver
     522
     523# which means two RTT but fetching and buffering the whole file
     524# before delivering anything. However, since we don't know when the
     525# other shares are going to arrive, we need to avoid having more than
     526# one block in the pipeline anyways. So we shouldn't be able to get
     527# into this state.
     528
     529# it also means that, instead of handling all of
     530# self._requested_blocks at once, we should only be handling one
     531# block at a time: one of the requested block should be special
     532# (probably FIFO). But retire all we can.
     533
     534    # this might be better with a Deferred, using COMPLETE as the success
     535    # case and CORRUPT/DEAD in an errback, because that would let us hold the
     536    # 'share' and 'shnum' arguments locally (instead of roundtripping them
     537    # through Share.send_request). But that OVERDUE is not terminal. So I
     538    # want a new sort of callback mechanism, with the extra-argument-passing
     539    # aspects of Deferred, but without being so one-shot. Is this a job for
     540    # Observer? No, it doesn't take extra arguments. So this uses Observer2.
     541
     542
     543class Reader:
     544    """I am responsible for a single offset+size read of the file. I handle
     545    segmentation: I figure out which segments are necessary, request them
     546    (from my CiphertextDownloader) in order, and trim the segments down to
     547    match the offset+size span. I use the Producer/Consumer interface to only
     548    request one segment at a time.
     549    """
     550    implements(IPushProducer)
     551    def __init__(self, consumer, offset, size):
     552        self._needed = []
     553        self._consumer = consumer
     554        self._hungry = False
     555        self._offset = offset
     556        self._size = size
     557        self._segsize = None
     558    def start(self):
     559        self._alive = True
     560        self._deferred = defer.Deferred()
     561        # the process doesn't actually start until set_segment_size()
     562        return self._deferred
     563
     564    def set_segment_size(self, segsize):
     565        if self._segsize is not None:
     566            return
     567        self._segsize = segsize
     568        self._compute_segnums()
     569
     570    def _compute_segnums(self, segsize):
     571        # now that we know the file's segsize, what segments (and which
     572        # ranges of each) will we need?
     573        size = self._size
     574        offset = self._offset
     575        while size:
     576            assert size >= 0
     577            this_seg_num = int(offset / self._segsize)
     578            this_seg_offset = offset - (seg_num*self._segsize)
     579            this_seg_size = min(size, self._segsize-seg_offset)
     580            size -= this_seg_size
     581            if size:
     582                offset += this_seg_size
     583            yield (this_seg_num, this_seg_offset, this_seg_size)
     584
     585    def get_needed_segments(self):
     586        return set([segnum for (segnum, off, size) in self._needed])
     587
     588
     589    def stopProducing(self):
     590        self._hungry = False
     591        self._alive = False
     592        # TODO: cancel the segment requests
     593    def pauseProducing(self):
     594        self._hungry = False
     595    def resumeProducing(self):
     596        self._hungry = True
     597    def add_segment(self, segnum, offset, size):
     598        self._needed.append( (segnum, offset, size) )
     599    def got_segment(self, segnum, segdata):
     600        """Return True if this schedule has more to go, or False if it is
     601        done."""
     602        assert self._needed[0][segnum] == segnum
     603        (_ign, offset, size) = self._needed.pop(0)
     604        data = segdata[offset:offset+size]
     605        self._consumer.write(data)
     606        if not self._needed:
     607            # we're done
     608            self._alive = False
     609            self._hungry = False
     610            self._consumer.unregisterProducer()
     611            self._deferred.callback(self._consumer)
     612    def error(self, f):
     613        self._alive = False
     614        self._hungry = False
     615        self._consumer.unregisterProducer()
     616        self._deferred.errback(f)
     617
     618
     619
     620class x:
     621    def OFFread(self, consumer, offset=0, size=None):
     622        """I am the main entry point, from which FileNode.read() can get
     623        data."""
     624        # tolerate concurrent operations: each gets its own Reader
     625        if size is None:
     626            size = self._size - offset
     627        r = Reader(consumer, offset, size)
     628        self._readers.add(r)
     629        d = r.start()
     630        if self.segment_size is not None:
     631            r.set_segment_size(self.segment_size)
     632            # TODO: if we can't find any segments, and thus never get a
     633            # segsize, tell the Readers to give up
     634        return d
  • new file src/allmydata/immutable/download2_util.py

    diff --git a/src/allmydata/immutable/download2_util.py b/src/allmydata/immutable/download2_util.py
    new file mode 100755
    index 0000000..9e20ff4
    - +  
     1import weakref
     2
     3from twisted.application import service
     4from foolscap.api import eventually
     5
     6class Observer2:
     7    """A simple class to distribute multiple events to a single subscriber.
     8    It accepts arbitrary kwargs, but no posargs."""
     9    def __init__(self):
     10        self._watcher = None
     11        self._undelivered_results = []
     12        self._canceler = None
     13
     14    def set_canceler(self, f):
     15        # we use a weakref to avoid creating a cycle between us and the thing
     16        # we're observing: they'll be holding a reference to us to compare
     17        # against the value we pass to their canceler function.
     18        self._canceler = weakref.ref(f)
     19
     20    def subscribe(self, observer, **watcher_kwargs):
     21        self._watcher = (observer, watcher_kwargs)
     22        while self._undelivered_results:
     23            self._notify(self._undelivered_results.pop(0))
     24
     25    def notify(self, **result_kwargs):
     26        if self._watcher:
     27            self._notify(result_kwargs)
     28        else:
     29            self._undelivered_results.append(result_kwargs)
     30
     31    def _notify(self, result_kwargs):
     32        o, watcher_kwargs = self._watcher
     33        kwargs = dict(result_kwargs)
     34        kwargs.update(watcher_kwargs)
     35        eventually(o, **kwargs)
     36
     37    def cancel(self):
     38        f = self._canceler()
     39        if f:
     40            f(self)
     41
     42
     43def incidentally(res, f, *args, **kwargs):
     44    """Add me to a Deferred chain like this:
     45     d.addBoth(incidentally, func, arg)
     46    and I'll behave as if you'd added the following function:
     47     def _(res):
     48         func(arg)
     49         return res
     50    This is useful if you want to execute an expression when the Deferred
     51    fires, but don't care about its value.
     52    """
     53    f(*args, **kwargs)
     54    return res
     55
     56
     57class Terminator(service.Service):
     58    def __init__(self):
     59        self._clients = weakref.WeakKeyDictionary()
     60    def register(self, c):
     61        self._clients[c] = None
     62    def stopService(self):
     63        for c in self._clients:
     64            c.stop()
     65        return service.Service.stopService(self)
  • src/allmydata/nodemaker.py

    diff --git a/src/allmydata/nodemaker.py b/src/allmydata/nodemaker.py
    index a30efbf..36ddfc7 100644
    a b import weakref 
    22from zope.interface import implements
    33from allmydata.util.assertutil import precondition
    44from allmydata.interfaces import INodeMaker, MustBeDeepImmutableError
    5 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
     5from allmydata.immutable.filenode import LiteralFileNode
     6from allmydata.immutable.download2 import ImmutableFileNode
    67from allmydata.immutable.upload import Data
    78from allmydata.mutable.filenode import MutableFileNode
    89from allmydata.dirnode import DirectoryNode, pack_children
    class NodeMaker: 
    1718    implements(INodeMaker)
    1819
    1920    def __init__(self, storage_broker, secret_holder, history,
    20                  uploader, downloader, download_cache_dirman,
     21                 uploader, terminator,
    2122                 default_encoding_parameters, key_generator):
    2223        self.storage_broker = storage_broker
    2324        self.secret_holder = secret_holder
    2425        self.history = history
    2526        self.uploader = uploader
    26         self.downloader = downloader
    27         self.download_cache_dirman = download_cache_dirman
     27        self.terminator = terminator
    2828        self.default_encoding_parameters = default_encoding_parameters
    2929        self.key_generator = key_generator
    3030
    class NodeMaker: 
    3434        return LiteralFileNode(cap)
    3535    def _create_immutable(self, cap):
    3636        return ImmutableFileNode(cap, self.storage_broker, self.secret_holder,
    37                                  self.downloader, self.history,
    38                                  self.download_cache_dirman)
     37                                 self.terminator, self.history)
    3938    def _create_mutable(self, cap):
    4039        n = MutableFileNode(self.storage_broker, self.secret_holder,
    4140                            self.default_encoding_parameters,
    class NodeMaker: 
    4847        # this returns synchronously. It starts with a "cap string".
    4948        assert isinstance(writecap, (str, type(None))), type(writecap)
    5049        assert isinstance(readcap,  (str, type(None))), type(readcap)
    51        
     50
    5251        bigcap = writecap or readcap
    5352        if not bigcap:
    5453            # maybe the writecap was hidden because we're in a readonly
  • src/allmydata/test/test_util.py

    diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py
    index 0a326b3..5f6ce67 100644
    a b from twisted.trial import unittest 
    77from twisted.internet import defer, reactor
    88from twisted.python.failure import Failure
    99from twisted.python import log
     10from hashlib import md5
    1011
    1112from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
    1213from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
    1314from allmydata.util import limiter, time_format, pollmixin, cachedir
    1415from allmydata.util import statistics, dictutil, pipeline
    1516from allmydata.util import log as tahoe_log
     17from allmydata.util.spans import Spans, overlap, DataSpans
    1618
    1719class Base32(unittest.TestCase):
    1820    def test_b2a_matches_Pythons(self):
    class Log(unittest.TestCase): 
    15371539        tahoe_log.err(format="intentional sample error",
    15381540                      failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ")
    15391541        self.flushLoggedErrors(SampleError)
     1542
     1543
     1544class SimpleSpans:
     1545    # this is a simple+inefficient form of util.spans.Spans . We compare the
     1546    # behavior of this reference model against the real (efficient) form.
     1547
     1548    def __init__(self, _span_or_start=None, length=None):
     1549        self._have = set()
     1550        if length is not None:
     1551            for i in range(_span_or_start, _span_or_start+length):
     1552                self._have.add(i)
     1553        elif _span_or_start:
     1554            for (start,length) in _span_or_start:
     1555                self.add(start, length)
     1556
     1557    def add(self, start, length):
     1558        for i in range(start, start+length):
     1559            self._have.add(i)
     1560        return self
     1561
     1562    def remove(self, start, length):
     1563        for i in range(start, start+length):
     1564            self._have.discard(i)
     1565        return self
     1566
     1567    def each(self):
     1568        return sorted(self._have)
     1569
     1570    def __iter__(self):
     1571        items = sorted(self._have)
     1572        prevstart = None
     1573        prevend = None
     1574        for i in items:
     1575            if prevstart is None:
     1576                prevstart = prevend = i
     1577                continue
     1578            if i == prevend+1:
     1579                prevend = i
     1580                continue
     1581            yield (prevstart, prevend-prevstart+1)
     1582            prevstart = prevend = i
     1583        if prevstart is not None:
     1584            yield (prevstart, prevend-prevstart+1)
     1585
     1586    def __len__(self):
     1587        # this also gets us bool(s)
     1588        return len(self._have)
     1589
     1590    def __add__(self, other):
     1591        s = self.__class__(self)
     1592        for (start, length) in other:
     1593            s.add(start, length)
     1594        return s
     1595
     1596    def __sub__(self, other):
     1597        s = self.__class__(self)
     1598        for (start, length) in other:
     1599            s.remove(start, length)
     1600        return s
     1601
     1602    def __iadd__(self, other):
     1603        for (start, length) in other:
     1604            self.add(start, length)
     1605        return self
     1606
     1607    def __isub__(self, other):
     1608        for (start, length) in other:
     1609            self.remove(start, length)
     1610        return self
     1611
     1612    def __contains__(self, (start,length)):
     1613        for i in range(start, start+length):
     1614            if i not in self._have:
     1615                return False
     1616        return True
     1617
     1618class ByteSpans(unittest.TestCase):
     1619    def test_basic(self):
     1620        s = Spans()
     1621        self.failUnlessEqual(list(s), [])
     1622        self.failIf(s)
     1623        self.failIf((0,1) in s)
     1624        self.failUnlessEqual(len(s), 0)
     1625
     1626        s1 = Spans(3, 4) # 3,4,5,6
     1627        self._check1(s1)
     1628
     1629        s2 = Spans(s1)
     1630        self._check1(s2)
     1631
     1632        s2.add(10,2) # 10,11
     1633        self._check1(s1)
     1634        self.failUnless((10,1) in s2)
     1635        self.failIf((10,1) in s1)
     1636        self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11])
     1637        self.failUnlessEqual(len(s2), 6)
     1638
     1639        s2.add(15,2).add(20,2)
     1640        self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21])
     1641        self.failUnlessEqual(len(s2), 10)
     1642
     1643        s2.remove(4,3).remove(15,1)
     1644        self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21])
     1645        self.failUnlessEqual(len(s2), 6)
     1646
     1647    def _check1(self, s):
     1648        self.failUnlessEqual(list(s), [(3,4)])
     1649        self.failUnless(s)
     1650        self.failUnlessEqual(len(s), 4)
     1651        self.failIf((0,1) in s)
     1652        self.failUnless((3,4) in s)
     1653        self.failUnless((3,1) in s)
     1654        self.failUnless((5,2) in s)
     1655        self.failUnless((6,1) in s)
     1656        self.failIf((6,2) in s)
     1657        self.failIf((7,1) in s)
     1658        self.failUnlessEqual(list(s.each()), [3,4,5,6])
     1659
     1660    def test_math(self):
     1661        s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9
     1662        s2 = Spans(5, 3) # 5,6,7
     1663        s3 = Spans(8, 4) # 8,9,10,11
     1664
     1665        s = s1 - s2
     1666        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
     1667        s = s1 - s3
     1668        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
     1669        s = s2 - s3
     1670        self.failUnlessEqual(list(s.each()), [5,6,7])
     1671
     1672        s = s1 + s2
     1673        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
     1674        s = s1 + s3
     1675        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
     1676        s = s2 + s3
     1677        self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
     1678
     1679        s = Spans(s1)
     1680        s -= s2
     1681        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
     1682        s = Spans(s1)
     1683        s -= s3
     1684        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
     1685        s = Spans(s2)
     1686        s -= s3
     1687        self.failUnlessEqual(list(s.each()), [5,6,7])
     1688
     1689        s = Spans(s1)
     1690        s += s2
     1691        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
     1692        s = Spans(s1)
     1693        s += s3
     1694        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
     1695        s = Spans(s2)
     1696        s += s3
     1697        self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
     1698
     1699    def test_random(self):
     1700        # attempt to increase coverage of corner cases by comparing behavior
     1701        # of a simple-but-slow model implementation against the
     1702        # complex-but-fast actual implementation, in a large number of random
     1703        # operations
     1704        S1 = SimpleSpans
     1705        S2 = Spans
     1706        s1 = S1(); s2 = S2()
     1707        seed = ""
     1708        def _create(subseed):
     1709            ns1 = S1(); ns2 = S2()
     1710            for i in range(10):
     1711                what = md5(subseed+str(i)).hexdigest()
     1712                start = int(what[2:4], 16)
     1713                length = max(1,int(what[5:6], 16))
     1714                ns1.add(start, length); ns2.add(start, length)
     1715            return ns1, ns2
     1716
     1717        #print
     1718        for i in range(1000):
     1719            what = md5(seed+str(i)).hexdigest()
     1720            op = what[0]
     1721            subop = what[1]
     1722            start = int(what[2:4], 16)
     1723            length = max(1,int(what[5:6], 16))
     1724            #print what
     1725            if op in "0":
     1726                if subop in "01234":
     1727                    s1 = S1(); s2 = S2()
     1728                elif subop in "5678":
     1729                    s1 = S1(start, length); s2 = S2(start, length)
     1730                else:
     1731                    s1 = S1(s1); s2 = S2(s2)
     1732                #print "s2 = %s" % s2.dump()
     1733            elif op in "123":
     1734                #print "s2.add(%d,%d)" % (start, length)
     1735                s1.add(start, length); s2.add(start, length)
     1736            elif op in "456":
     1737                #print "s2.remove(%d,%d)" % (start, length)
     1738                s1.remove(start, length); s2.remove(start, length)
     1739            elif op in "78":
     1740                ns1, ns2 = _create(what[7:11])
     1741                #print "s2 + %s" % ns2.dump()
     1742                s1 = s1 + ns1; s2 = s2 + ns2
     1743            elif op in "9a":
     1744                ns1, ns2 = _create(what[7:11])
     1745                #print "%s - %s" % (s2.dump(), ns2.dump())
     1746                s1 = s1 - ns1; s2 = s2 - ns2
     1747            elif op in "bc":
     1748                ns1, ns2 = _create(what[7:11])
     1749                #print "s2 += %s" % ns2.dump()
     1750                s1 += ns1; s2 += ns2
     1751            else:
     1752                ns1, ns2 = _create(what[7:11])
     1753                #print "%s -= %s" % (s2.dump(), ns2.dump())
     1754                s1 -= ns1; s2 -= ns2
     1755            #print "s2 now %s" % s2.dump()
     1756            self.failUnlessEqual(list(s1.each()), list(s2.each()))
     1757            self.failUnlessEqual(len(s1), len(s2))
     1758            self.failUnlessEqual(bool(s1), bool(s2))
     1759            self.failUnlessEqual(list(s1), list(s2))
     1760            for j in range(10):
     1761                what = md5(what[12:14]+str(j)).hexdigest()
     1762                start = int(what[2:4], 16)
     1763                length = max(1, int(what[5:6], 16))
     1764                span = (start, length)
     1765                self.failUnlessEqual(bool(span in s1), bool(span in s2))
     1766
     1767
     1768    # s()
     1769    # s(start,length)
     1770    # s(s0)
     1771    # s.add(start,length) : returns s
     1772    # s.remove(start,length)
     1773    # s.each() -> list of byte offsets, mostly for testing
     1774    # list(s) -> list of (start,length) tuples, one per span
     1775    # (start,length) in s -> True if (start..start+length-1) are all members
     1776    #  NOT equivalent to x in list(s)
     1777    # len(s) -> number of bytes, for testing, bool(), and accounting/limiting
     1778    # bool(s)  (__len__)
     1779    # s = s1+s2, s1-s2, +=s1, -=s1
     1780
     1781    def test_overlap(self):
     1782        for a in range(20):
     1783            for b in range(10):
     1784                for c in range(20):
     1785                    for d in range(10):
     1786                        self._test_overlap(a,b,c,d)
     1787
     1788    def _test_overlap(self, a, b, c, d):
     1789        s1 = set(range(a,a+b))
     1790        s2 = set(range(c,c+d))
     1791        #print "---"
     1792        #self._show_overlap(s1, "1")
     1793        #self._show_overlap(s2, "2")
     1794        o = overlap(a,b,c,d)
     1795        expected = s1.intersection(s2)
     1796        if not expected:
     1797            self.failUnlessEqual(o, None)
     1798        else:
     1799            start,length = o
     1800            so = set(range(start,start+length))
     1801            #self._show(so, "o")
     1802            self.failUnlessEqual(so, expected)
     1803
     1804    def _show_overlap(self, s, c):
     1805        import sys
     1806        out = sys.stdout
     1807        if s:
     1808            for i in range(max(s)):
     1809                if i in s:
     1810                    out.write(c)
     1811                else:
     1812                    out.write(" ")
     1813        out.write("\n")
     1814
     1815def extend(s, start, length, fill):
     1816    if len(s) >= start+length:
     1817        return s
     1818    assert len(fill) == 1
     1819    return s + fill*(start+length-len(s))
     1820
     1821def replace(s, start, data):
     1822    assert len(s) >= start+len(data)
     1823    return s[:start] + data + s[start+len(data):]
     1824
     1825class SimpleDataSpans:
     1826    def __init__(self, other=None):
     1827        self.missing = "" # "1" where missing, "0" where found
     1828        self.data = ""
     1829        if other:
     1830            for (start, data) in other.get_spans():
     1831                self.add(start, data)
     1832
     1833    def __len__(self):
     1834        return len(self.missing.translate(None, "1"))
     1835    def _dump(self):
     1836        return [i for (i,c) in enumerate(self.missing) if c == "0"]
     1837    def _have(self, start, length):
     1838        m = self.missing[start:start+length]
     1839        if not m or len(m)<length or int(m):
     1840            return False
     1841        return True
     1842    def get_spans(self):
     1843        for i in self._dump():
     1844            yield (i, self.data[i])
     1845    def get(self, start, length):
     1846        if self._have(start, length):
     1847            return self.data[start:start+length]
     1848        return None
     1849    def pop(self, start, length):
     1850        data = self.get(start, length)
     1851        if data:
     1852            self.remove(start, length)
     1853        return data
     1854    def remove(self, start, length):
     1855        self.missing = replace(extend(self.missing, start, length, "1"),
     1856                               start, "1"*length)
     1857    def add(self, start, data):
     1858        self.missing = replace(extend(self.missing, start, len(data), "1"),
     1859                               start, "0"*len(data))
     1860        self.data = replace(extend(self.data, start, len(data), " "),
     1861                            start, data)
     1862
     1863
     1864class StringSpans(unittest.TestCase):
     1865    def do_basic(self, klass):
     1866        ds = klass()
     1867        self.failUnlessEqual(len(ds), 0)
     1868        self.failUnlessEqual(list(ds._dump()), [])
     1869        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 0)
     1870        self.failUnlessEqual(ds.get(0, 4), None)
     1871        self.failUnlessEqual(ds.pop(0, 4), None)
     1872        ds.remove(0, 4)
     1873
     1874        ds.add(2, "four")
     1875        self.failUnlessEqual(len(ds), 4)
     1876        self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
     1877        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 4)
     1878        self.failUnlessEqual(ds.get(0, 4), None)
     1879        self.failUnlessEqual(ds.pop(0, 4), None)
     1880        self.failUnlessEqual(ds.get(4, 4), None)
     1881
     1882        ds2 = klass(ds)
     1883        self.failUnlessEqual(len(ds2), 4)
     1884        self.failUnlessEqual(list(ds2._dump()), [2,3,4,5])
     1885        self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_spans()]), 4)
     1886        self.failUnlessEqual(ds2.get(0, 4), None)
     1887        self.failUnlessEqual(ds2.pop(0, 4), None)
     1888        self.failUnlessEqual(ds2.pop(2, 3), "fou")
     1889        self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_spans()]), 1)
     1890        self.failUnlessEqual(ds2.get(2, 3), None)
     1891        self.failUnlessEqual(ds2.get(5, 1), "r")
     1892        self.failUnlessEqual(ds.get(2, 3), "fou")
     1893        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 4)
     1894
     1895        ds.add(0, "23")
     1896        self.failUnlessEqual(len(ds), 6)
     1897        self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5])
     1898        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 6)
     1899        self.failUnlessEqual(ds.get(0, 4), "23fo")
     1900        self.failUnlessEqual(ds.pop(0, 4), "23fo")
     1901        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 2)
     1902        self.failUnlessEqual(ds.get(0, 4), None)
     1903        self.failUnlessEqual(ds.pop(0, 4), None)
     1904
     1905        ds = klass()
     1906        ds.add(2, "four")
     1907        ds.add(3, "ea")
     1908        self.failUnlessEqual(ds.get(2, 4), "fear")
     1909
     1910    def do_scan(self, klass):
     1911        # do a test with gaps and spans of size 1 and 2
     1912        #  left=(1,11) * right=(1,11) * gapsize=(1,2)
     1913        # 111, 112, 121, 122, 211, 212, 221, 222
     1914        #    211
     1915        #      121
     1916        #         112
     1917        #            212
     1918        #               222
     1919        #                   221
     1920        #                      111
     1921        #                        122
     1922        #  11 1  1 11 11  11  1 1  111
     1923        # 0123456789012345678901234567
     1924        # abcdefghijklmnopqrstuvwxyz-=
     1925        pieces = [(1, "bc"),
     1926                  (4, "e"),
     1927                  (7, "h"),
     1928                  (9, "jk"),
     1929                  (12, "mn"),
     1930                  (16, "qr"),
     1931                  (20, "u"),
     1932                  (22, "w"),
     1933                  (25, "z-="),
     1934                  ]
     1935        p_elements = set([1,2,4,7,9,10,12,13,16,17,20,22,25,26,27])
     1936        S = "abcdefghijklmnopqrstuvwxyz-="
     1937        # TODO: when adding data, add capital letters, to make sure we aren't
     1938        # just leaving the old data in place
     1939        l = len(S)
     1940        def base():
     1941            ds = klass()
     1942            for start, data in pieces:
     1943                ds.add(start, data)
     1944            return ds
     1945        def dump(s):
     1946            p = set(s._dump())
     1947            # wow, this is the first time I've ever wanted ?: in python
     1948            # note: this requires python2.5
     1949            d = "".join([(S[i] if i in p else " ") for i in range(l)])
     1950            assert len(d) == l
     1951            return d
     1952        DEBUG = False
     1953        for start in range(0, l):
     1954            for end in range(start+1, l):
     1955                # add [start-end) to the baseline
     1956                which = "%d-%d" % (start, end-1)
     1957                p_added = set(range(start, end))
     1958                b = base()
     1959                if DEBUG:
     1960                    print
     1961                    print dump(b), which
     1962                    add = klass(); add.add(start, S[start:end])
     1963                    print dump(add)
     1964                b.add(start, S[start:end])
     1965                if DEBUG:
     1966                    print dump(b)
     1967                # check that the new span is there
     1968                d = b.get(start, end-start)
     1969                self.failUnlessEqual(d, S[start:end], which)
     1970                # check that all the original pieces are still there
     1971                for t_start, t_data in pieces:
     1972                    t_len = len(t_data)
     1973                    self.failUnlessEqual(b.get(t_start, t_len),
     1974                                         S[t_start:t_start+t_len],
     1975                                         "%s %d+%d" % (which, t_start, t_len))
     1976                # check that a lot of subspans are mostly correct
     1977                for t_start in range(l):
     1978                    for t_len in range(1,4):
     1979                        d = b.get(t_start, t_len)
     1980                        if d is not None:
     1981                            which2 = "%s+(%d-%d)" % (which, t_start,
     1982                                                     t_start+t_len-1)
     1983                            self.failUnlessEqual(d, S[t_start:t_start+t_len],
     1984                                                 which2)
     1985                        # check that removing a subspan gives the right value
     1986                        b2 = klass(b)
     1987                        b2.remove(t_start, t_len)
     1988                        removed = set(range(t_start, t_start+t_len))
     1989                        for i in range(l):
     1990                            exp = (((i in p_elements) or (i in p_added))
     1991                                   and (i not in removed))
     1992                            which2 = "%s-(%d-%d)" % (which, t_start,
     1993                                                     t_start+t_len-1)
     1994                            self.failUnlessEqual(bool(b2.get(i, 1)), exp,
     1995                                                 which2+" %d" % i)
     1996
     1997    def test_test(self):
     1998        self.do_basic(SimpleDataSpans)
     1999        self.do_scan(SimpleDataSpans)
     2000
     2001    def test_basic(self):
     2002        self.do_basic(DataSpans)
     2003        self.do_scan(DataSpans)
     2004
     2005    def test_random(self):
     2006        # attempt to increase coverage of corner cases by comparing behavior
     2007        # of a simple-but-slow model implementation against the
     2008        # complex-but-fast actual implementation, in a large number of random
     2009        # operations
     2010        S1 = SimpleDataSpans
     2011        S2 = DataSpans
     2012        s1 = S1(); s2 = S2()
     2013        seed = ""
     2014        def _randstr(length, seed):
     2015            created = 0
     2016            pieces = []
     2017            while created < length:
     2018                piece = md5(seed + str(created)).hexdigest()
     2019                pieces.append(piece)
     2020                created += len(piece)
     2021            return "".join(pieces)[:length]
     2022        def _create(subseed):
     2023            ns1 = S1(); ns2 = S2()
     2024            for i in range(10):
     2025                what = md5(subseed+str(i)).hexdigest()
     2026                start = int(what[2:4], 16)
     2027                length = max(1,int(what[5:6], 16))
     2028                ns1.add(start, _randstr(length, what[7:9]));
     2029                ns2.add(start, _randstr(length, what[7:9]))
     2030            return ns1, ns2
     2031
     2032        #print
     2033        for i in range(1000):
     2034            what = md5(seed+str(i)).hexdigest()
     2035            op = what[0]
     2036            subop = what[1]
     2037            start = int(what[2:4], 16)
     2038            length = max(1,int(what[5:6], 16))
     2039            #print what
     2040            if op in "0":
     2041                if subop in "0123456":
     2042                    s1 = S1(); s2 = S2()
     2043                else:
     2044                    s1, s2 = _create(what[7:11])
     2045                #print "s2 = %s" % list(s2._dump())
     2046            elif op in "123456":
     2047                #print "s2.add(%d,%d)" % (start, length)
     2048                s1.add(start, _randstr(length, what[7:9]));
     2049                s2.add(start, _randstr(length, what[7:9]))
     2050            elif op in "789abc":
     2051                #print "s2.remove(%d,%d)" % (start, length)
     2052                s1.remove(start, length); s2.remove(start, length)
     2053            else:
     2054                #print "s2.pop(%d,%d)" % (start, length)
     2055                d1 = s1.pop(start, length); d2 = s2.pop(start, length)
     2056                self.failUnlessEqual(d1, d2)
     2057            #print "s1 now %s" % list(s1._dump())
     2058            #print "s2 now %s" % list(s2._dump())
     2059            self.failUnlessEqual(len(s1), len(s2))
     2060            self.failUnlessEqual(list(s1._dump()), list(s2._dump()))
     2061            for j in range(100):
     2062                what = md5(what[12:14]+str(j)).hexdigest()
     2063                start = int(what[2:4], 16)
     2064                length = max(1, int(what[5:6], 16))
     2065                d1 = s1.get(start, length); d2 = s2.get(start, length)
     2066                self.failUnlessEqual(d1, d2, "%d+%d" % (start, length))
  • src/allmydata/util/dictutil.py

    diff --git a/src/allmydata/util/dictutil.py b/src/allmydata/util/dictutil.py
    index 3dc815b..91785ac 100644
    a b class DictOfSets(dict): 
    5757        if not self[key]:
    5858            del self[key]
    5959
     60    def allvalues(self):
     61        # return a set that merges all value sets
     62        r = set()
     63        for key in self:
     64            r.update(self[key])
     65        return r
     66
    6067class UtilDict:
    6168    def __init__(self, initialdata={}):
    6269        self.d = {}
  • new file src/allmydata/util/spans.py

    diff --git a/src/allmydata/util/spans.py b/src/allmydata/util/spans.py
    new file mode 100755
    index 0000000..336fddf
    - +  
     1
     2class Spans:
     3    """I represent a compressed list of booleans, one per index (an integer).
     4    Typically, each index represents an offset into a large string, pointing
     5    to a specific byte of a share. In this context, True means that byte has
     6    been received, or has been requested.
     7
     8    Another way to look at this is maintaining a set of integers, optimized
     9    for operations on spans like 'add range to set' and 'is range in set?'.
     10
     11    This is a python equivalent of perl's Set::IntSpan module, frequently
     12    used to represent .newsrc contents.
     13
     14    Rather than storing an actual (large) list or dictionary, I represent my
     15    internal state as a sorted list of spans, each with a start and a length.
     16    My API is presented in terms of start+length pairs. I provide set
     17    arithmetic operators, to efficiently answer questions like 'I want bytes
     18    XYZ, I already requested bytes ABC, and I've already received bytes DEF:
     19    what bytes should I request now?'.
     20
     21    The new downloader will use it to keep track of which bytes we've requested
     22    or received already.
     23    """
     24
     25    def __init__(self, _span_or_start=None, length=None):
     26        self._spans = list()
     27        if length is not None:
     28            self._spans.append( (_span_or_start, length) )
     29        elif _span_or_start:
     30            for (start,length) in _span_or_start:
     31                self.add(start, length)
     32        self._check()
     33
     34    def _check(self):
     35        assert sorted(self._spans) == self._spans
     36        prev_end = None
     37        try:
     38            for (start,length) in self._spans:
     39                if prev_end is not None:
     40                    assert start > prev_end
     41                prev_end = start+length
     42        except AssertionError:
     43            print "BAD:", self.dump()
     44            raise
     45
     46    def add(self, start, length):
     47        assert start >= 0
     48        assert length > 0
     49        #print " ADD [%d+%d -%d) to %s" % (start, length, start+length, self.dump())
     50        first_overlap = last_overlap = None
     51        for i,(s_start,s_length) in enumerate(self._spans):
     52            #print "  (%d+%d)-> overlap=%s adjacent=%s" % (s_start,s_length, overlap(s_start, s_length, start, length), adjacent(s_start, s_length, start, length))
     53            if (overlap(s_start, s_length, start, length)
     54                or adjacent(s_start, s_length, start, length)):
     55                last_overlap = i
     56                if first_overlap is None:
     57                    first_overlap = i
     58                continue
     59            # no overlap
     60            if first_overlap is not None:
     61                break
     62        #print "  first_overlap", first_overlap, last_overlap
     63        if first_overlap is None:
     64            # no overlap, so just insert the span and sort by starting
     65            # position.
     66            self._spans.insert(0, (start,length))
     67            self._spans.sort()
     68        else:
     69            # everything from [first_overlap] to [last_overlap] overlapped
     70            first_start,first_length = self._spans[first_overlap]
     71            last_start,last_length = self._spans[last_overlap]
     72            newspan_start = min(start, first_start)
     73            newspan_end = max(start+length, last_start+last_length)
     74            newspan_length = newspan_end - newspan_start
     75            newspan = (newspan_start, newspan_length)
     76            self._spans[first_overlap:last_overlap+1] = [newspan]
     77        #print "  ADD done: %s" % self.dump()
     78        self._check()
     79
     80        return self
     81
     82    def remove(self, start, length):
     83        assert start >= 0
     84        assert length > 0
     85        #print " REMOVE [%d+%d -%d) from %s" % (start, length, start+length, self.dump())
     86        first_complete_overlap = last_complete_overlap = None
     87        for i,(s_start,s_length) in enumerate(self._spans):
     88            s_end = s_start + s_length
     89            o = overlap(s_start, s_length, start, length)
     90            if o:
     91                o_start, o_length = o
     92                o_end = o_start+o_length
     93                if o_start == s_start and o_end == s_end:
     94                    # delete this span altogether
     95                    if first_complete_overlap is None:
     96                        first_complete_overlap = i
     97                    last_complete_overlap = i
     98                elif o_start == s_start:
     99                    # we only overlap the left side, so trim the start
     100                    #    1111
     101                    #  rrrr
     102                    #    oo
     103                    # ->   11
     104                    new_start = o_end
     105                    new_end = s_end
     106                    assert new_start > s_start
     107                    new_length = new_end - new_start
     108                    self._spans[i] = (new_start, new_length)
     109                elif o_end == s_end:
     110                    # we only overlap the right side
     111                    #    1111
     112                    #      rrrr
     113                    #      oo
     114                    # -> 11
     115                    new_start = s_start
     116                    new_end = o_start
     117                    assert new_end < s_end
     118                    new_length = new_end - new_start
     119                    self._spans[i] = (new_start, new_length)
     120                else:
     121                    # we overlap the middle, so create a new span. No need to
     122                    # examine any other spans.
     123                    #    111111
     124                    #      rr
     125                    #    LL  RR
     126                    left_start = s_start
     127                    left_end = o_start
     128                    left_length = left_end - left_start
     129                    right_start = o_end
     130                    right_end = s_end
     131                    right_length = right_end - right_start
     132                    self._spans[i] = (left_start, left_length)
     133                    self._spans.append( (right_start, right_length) )
     134                    self._spans.sort()
     135                    break
     136        if first_complete_overlap is not None:
     137            del self._spans[first_complete_overlap:last_complete_overlap+1]
     138        #print "  REMOVE done: %s" % self.dump()
     139        self._check()
     140        return self
     141
     142    def dump(self):
     143        return "len=%d: %s" % (len(self),
     144                               ",".join(["[%d-%d]" % (start,start+l-1)
     145                                         for (start,l) in self._spans]) )
     146
     147    def each(self):
     148        for start, length in self._spans:
     149            for i in range(start, start+length):
     150                yield i
     151
     152    def __iter__(self):
     153        for s in self._spans:
     154            yield s
     155
     156    def __len__(self):
     157        # this also gets us bool(s)
     158        return sum([length for start,length in self._spans])
     159
     160    def __add__(self, other):
     161        s = self.__class__(self)
     162        for (start, length) in other:
     163            s.add(start, length)
     164        return s
     165
     166    def __sub__(self, other):
     167        s = self.__class__(self)
     168        for (start, length) in other:
     169            s.remove(start, length)
     170        return s
     171
     172    def __iadd__(self, other):
     173        for (start, length) in other:
     174            self.add(start, length)
     175        return self
     176
     177    def __isub__(self, other):
     178        for (start, length) in other:
     179            self.remove(start, length)
     180        return self
     181
     182    def __contains__(self, (start,length)):
     183        for span_start,span_length in self._spans:
     184            o = overlap(start, length, span_start, span_length)
     185            if o:
     186                o_start,o_length = o
     187                if o_start == start and o_length == length:
     188                    return True
     189        return False
     190
     191def overlap(start0, length0, start1, length1):
     192    # return start2,length2 of the overlapping region, or None
     193    #  00      00   000   0000  00  00 000  00   00  00      00
     194    #     11    11   11    11   111 11 11  1111 111 11    11
     195    left = max(start0, start1)
     196    right = min(start0+length0, start1+length1)
     197    # if there is overlap, 'left' will be its start, and right-1 will
     198    # be the end'
     199    if left < right:
     200        return (left, right-left)
     201    return None
     202
     203def adjacent(start0, length0, start1, length1):
     204    if (start0 < start1) and start0+length0 == start1:
     205        return True
     206    elif (start1 < start0) and start1+length1 == start0:
     207        return True
     208    return False
     209
     210class DataSpans:
     211    """I represent portions of a large string. Equivalently, I can be said to
     212    maintain a large array of characters (with gaps of empty elements). I can
     213    be used to manage access to a remote share, where some pieces have been
     214    retrieved, some have been requested, and others have not been read.
     215    """
     216
     217    def __init__(self, other=None):
     218        self.spans = [] # (start, data) tuples, non-overlapping, merged
     219        if other:
     220            for (start, data) in other.get_spans():
     221                self.add(start, data)
     222
     223    def __len__(self):
     224        # return number of bytes we're holding
     225        return sum([len(data) for (start,data) in self.spans])
     226
     227    def _dump(self):
     228        # return iterator of sorted list of offsets, one per byte
     229        for (start,data) in self.spans:
     230            for i in range(start, start+len(data)):
     231                yield i
     232
     233    def get_spans(self):
     234        return list(self.spans)
     235
     236    def assert_invariants(self):
     237        if not self.spans:
     238            return
     239        prev_start = self.spans[0][0]
     240        prev_end = prev_start + len(self.spans[0][1])
     241        for start, data in self.spans[1:]:
     242            if not start > prev_end:
     243                # adjacent or overlapping: bad
     244                print "ASSERTION FAILED", self.spans
     245                raise AssertionError
     246
     247    def get(self, start, length):
     248        # returns a string of LENGTH, or None
     249        #print "get", start, length, self.spans
     250        end = start+length
     251        for (s_start,s_data) in self.spans:
     252            s_end = s_start+len(s_data)
     253            #print " ",s_start,s_end
     254            if s_start <= start < s_end:
     255                # we want some data from this span. Because we maintain
     256                # strictly merged and non-overlapping spans, everything we
     257                # want must be in this span.
     258                offset = start - s_start
     259                if offset + length > len(s_data):
     260                    #print " None, span falls short"
     261                    return None # span falls short
     262                #print " some", s_data[offset:offset+length]
     263                return s_data[offset:offset+length]
     264            if s_start >= end:
     265                # we've gone too far: no further spans will overlap
     266                #print " None, gone too far"
     267                return None
     268        #print " None, ran out of spans"
     269        return None
     270
     271    def add(self, start, data):
     272        # first: walk through existing spans, find overlap, modify-in-place
     273        #  create list of new spans
     274        #  add new spans
     275        #  sort
     276        #  merge adjacent spans
     277        #print "add", start, data, self.spans
     278        end = start + len(data)
     279        i = 0
     280        while len(data):
     281            #print " loop", start, data, i, len(self.spans), self.spans
     282            if i >= len(self.spans):
     283                #print " append and done"
     284                # append a last span
     285                self.spans.append( (start, data) )
     286                break
     287            (s_start,s_data) = self.spans[i]
     288            # five basic cases:
     289            #  a: OLD  b:OLDD  c1:OLD  c2:OLD   d1:OLDD  d2:OLD  e: OLLDD
     290            #    NEW     NEW      NEW     NEWW      NEW      NEW     NEW
     291            #
     292            # we handle A by inserting a new segment (with "N") and looping,
     293            # turning it into B or C. We handle B by replacing a prefix and
     294            # terminating. We handle C (both c1 and c2) by replacing the
     295            # segment (and, for c2, looping, turning it into A). We handle D
     296            # by replacing a suffix (and, for d2, looping, turning it into
     297            # A). We handle E by replacing the middle and terminating.
     298            if start < s_start:
     299                # case A: insert a new span, then loop with the remainder
     300                #print " insert new psan"
     301                s_len = s_start-start
     302                self.spans.insert(i, (start, data[:s_len]))
     303                i += 1
     304                start = s_start
     305                data = data[s_len:]
     306                continue
     307            s_len = len(s_data)
     308            s_end = s_start+s_len
     309            if s_start <= start < s_end:
     310                #print " modify this span", s_start, start, s_end
     311                # we want to modify some data in this span: a prefix, a
     312                # suffix, or the whole thing
     313                if s_start == start:
     314                    if s_end <= end:
     315                        #print " replace whole segment"
     316                        # case C: replace this segment
     317                        self.spans[i] = (s_start, data[:s_len])
     318                        i += 1
     319                        start += s_len
     320                        data = data[s_len:]
     321                        # C2 is where len(data)>0
     322                        continue
     323                    # case B: modify the prefix, retain the suffix
     324                    #print " modify prefix"
     325                    self.spans[i] = (s_start, data + s_data[len(data):])
     326                    break
     327                if start > s_start and end < s_end:
     328                    # case E: modify the middle
     329                    #print " modify middle"
     330                    prefix_len = start - s_start # we retain this much
     331                    suffix_len = s_end - end # and retain this much
     332                    newdata = s_data[:prefix_len] + data + s_data[-suffix_len:]
     333                    self.spans[i] = (s_start, newdata)
     334                    break
     335                # case D: retain the prefix, modify the suffix
     336                #print " modify suffix"
     337                prefix_len = start - s_start # we retain this much
     338                suffix_len = s_len - prefix_len # we replace this much
     339                #print "  ", s_data, prefix_len, suffix_len, s_len, data
     340                self.spans[i] = (s_start,
     341                                 s_data[:prefix_len] + data[:suffix_len])
     342                i += 1
     343                start += suffix_len
     344                data = data[suffix_len:]
     345                #print "  now", start, data
     346                # D2 is where len(data)>0
     347                continue
     348            # else we're not there yet
     349            #print " still looking"
     350            i += 1
     351            continue
     352        # now merge adjacent spans
     353        #print " merging", self.spans
     354        newspans = []
     355        for (s_start,s_data) in self.spans:
     356            if newspans and adjacent(newspans[-1][0], len(newspans[-1][1]),
     357                                     s_start, len(s_data)):
     358                newspans[-1] = (newspans[-1][0], newspans[-1][1] + s_data)
     359            else:
     360                newspans.append( (s_start, s_data) )
     361        self.spans = newspans
     362        self.assert_invariants()
     363        #print " done", self.spans
     364
     365    def remove(self, start, length):
     366        i = 0
     367        end = start + length
     368        #print "remove", start, length, self.spans
     369        while i < len(self.spans):
     370            (s_start,s_data) = self.spans[i]
     371            if s_start >= end:
     372                # this segment is entirely right of the removed region, and
     373                # all further segments are even further right. We're done.
     374                break
     375            s_len = len(s_data)
     376            s_end = s_start + s_len
     377            o = overlap(start, length, s_start, s_len)
     378            if not o:
     379                i += 1
     380                continue
     381            o_start, o_len = o
     382            o_end = o_start + o_len
     383            if o_len == s_len:
     384                # remove the whole segment
     385                del self.spans[i]
     386                continue
     387            if o_start == s_start:
     388                # remove a prefix, leaving the suffix from o_end to s_end
     389                prefix_len = o_end - o_start
     390                self.spans[i] = (o_end, s_data[prefix_len:])
     391                i += 1
     392                continue
     393            elif o_end == s_end:
     394                # remove a suffix, leaving the prefix from s_start to o_start
     395                prefix_len = o_start - s_start
     396                self.spans[i] = (s_start, s_data[:prefix_len])
     397                i += 1
     398                continue
     399            # remove the middle, creating a new segment
     400            # left is s_start:o_start, right is o_end:s_end
     401            left_len = o_start - s_start
     402            left = s_data[:left_len]
     403            right_len = s_end - o_end
     404            right = s_data[-right_len:]
     405            self.spans[i] = (s_start, left)
     406            self.spans.insert(i+1, (o_end, right))
     407            break
     408        #print " done", self.spans
     409
     410    def pop(self, start, length):
     411        data = self.get(start, length)
     412        if data:
     413            self.remove(start, length)
     414        return data