source file: /home/buildslave/tahoe/edgy/build/src/allmydata/immutable/layout.py
file stats: 320 lines, 294 executed: 91.9% covered
coverage versus previous test: 0 lines added, 0 lines removed
    1. import struct
    2. from zope.interface import implements
    3. from twisted.internet import defer
    4. from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
    5.      FileTooLargeError, HASH_SIZE
    6. from allmydata.util import mathutil, idlib, observer, pipeline
    7. from allmydata.util.assertutil import precondition
    8. from allmydata.storage.server import si_b2a
    9. 
   10. class LayoutInvalid(Exception):
   11.     """ There is something wrong with these bytes so they can't be
   12.     interpreted as the kind of immutable file that I know how to download."""
   13.     pass
   14. 
   15. class RidiculouslyLargeURIExtensionBlock(LayoutInvalid):
   16.     """ When downloading a file, the length of the URI Extension Block was
   17.     given as >= 2**32. This means the share data must have been corrupted, or
   18.     else the original uploader of the file wrote a ridiculous value into the
   19.     URI Extension Block length."""
   20.     pass
   21. 
   22. class ShareVersionIncompatible(LayoutInvalid):
   23.     """ When downloading a share, its format was not one of the formats we
   24.     know how to parse."""
   25.     pass
   26. 
   27. """
   28. Share data is written in a file. At the start of the file, there is a series
   29. of four-byte big-endian offset values, which indicate where each section
   30. starts. Each offset is measured from the beginning of the share data.
   31. 
   32. 0x00: version number (=00 00 00 01)
   33. 0x04: block size # See Footnote 1 below.
   34. 0x08: share data size # See Footnote 1 below.
   35. 0x0c: offset of data (=00 00 00 24)
   36. 0x10: offset of plaintext_hash_tree UNUSED
   37. 0x14: offset of crypttext_hash_tree
   38. 0x18: offset of block_hashes
   39. 0x1c: offset of share_hashes
   40. 0x20: offset of uri_extension_length + uri_extension
   41. 0x24: start of data
   42. ?   : start of plaintext_hash_tree UNUSED
   43. ?   : start of crypttext_hash_tree
   44. ?   : start of block_hashes
   45. ?   : start of share_hashes
   46.        each share_hash is written as a two-byte (big-endian) hashnum
   47.        followed by the 32-byte SHA-256 hash. We store only the hashes
   48.        necessary to validate the share hash root
   49. ?   : start of uri_extension_length (four-byte big-endian value)
   50. ?   : start of uri_extension
   51. """
   52. 
   53. """
   54. v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size
   55. limitations described in #346.
   56. 
   57. 0x00: version number (=00 00 00 02)
   58. 0x04: block size # See Footnote 1 below.
   59. 0x0c: share data size # See Footnote 1 below.
   60. 0x14: offset of data (=00 00 00 00 00 00 00 44)
   61. 0x1c: offset of plaintext_hash_tree UNUSED
   62. 0x24: offset of crypttext_hash_tree
   63. 0x2c: offset of block_hashes
   64. 0x34: offset of share_hashes
   65. 0x3c: offset of uri_extension_length + uri_extension
   66. 0x44: start of data
   67.     : rest of share is the same as v1, above
   68. ...   ...
   69. ?   : start of uri_extension_length (eight-byte big-endian value)
   70. ?   : start of uri_extension
   71. """
   72. 
   73. # Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but
   74. # they are still provided when writing so that older versions of Tahoe can
   75. # read them.
   76. 
   77. def make_write_bucket_proxy(rref, data_size, block_size, num_segments,
   78.                             num_share_hashes, uri_extension_size_max, nodeid):
   79.     # Use layout v1 for small files, so they'll be readable by older versions
   80.     # (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
   81.     # by tahoe-1.3.0 or later.
   82.     try:
   83.         wbp = WriteBucketProxy(rref, data_size, block_size, num_segments,
   84.                                num_share_hashes, uri_extension_size_max, nodeid)
   85.     except FileTooLargeError:
   86.         wbp = WriteBucketProxy_v2(rref, data_size, block_size, num_segments,
   87.                                   num_share_hashes, uri_extension_size_max, nodeid)
   88.     return wbp
   89. 
   90. class WriteBucketProxy:
   91.     implements(IStorageBucketWriter)
   92.     fieldsize = 4
   93.     fieldstruct = ">L"
   94. 
   95.     def __init__(self, rref, data_size, block_size, num_segments,
   96.                  num_share_hashes, uri_extension_size_max, nodeid,
   97.                  pipeline_size=50000):
   98.         self._rref = rref
   99.         self._data_size = data_size
  100.         self._block_size = block_size
  101.         self._num_segments = num_segments
  102.         self._nodeid = nodeid
  103. 
  104.         effective_segments = mathutil.next_power_of_k(num_segments,2)
  105.         self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
  106.         # how many share hashes are included in each share? This will be
  107.         # about ln2(num_shares).
  108.         self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
  109.         # we commit to not sending a uri extension larger than this
  110.         self._uri_extension_size_max = uri_extension_size_max
  111. 
  112.         self._create_offsets(block_size, data_size)
  113. 
  114.         # k=3, max_segment_size=128KiB gives us a typical segment of 43691
  115.         # bytes. Setting the default pipeline_size to 50KB lets us get two
  116.         # segments onto the wire but not a third, which would keep the pipe
  117.         # filled.
  118.         self._pipeline = pipeline.Pipeline(pipeline_size)
  119. 
  120.     def get_allocated_size(self):
  121.         return (self._offsets['uri_extension'] + self.fieldsize +
  122.                 self._uri_extension_size_max)
  123. 
  124.     def _create_offsets(self, block_size, data_size):
  125.         if block_size >= 2**32 or data_size >= 2**32:
  126.             raise FileTooLargeError("This file is too large to be uploaded (data_size).")
  127. 
  128.         offsets = self._offsets = {}
  129.         x = 0x24
  130.         offsets['data'] = x
  131.         x += data_size
  132.         offsets['plaintext_hash_tree'] = x # UNUSED
  133.         x += self._segment_hash_size
  134.         offsets['crypttext_hash_tree'] = x
  135.         x += self._segment_hash_size
  136.         offsets['block_hashes'] = x
  137.         x += self._segment_hash_size
  138.         offsets['share_hashes'] = x
  139.         x += self._share_hashtree_size
  140.         offsets['uri_extension'] = x
  141. 
  142.         if x >= 2**32:
  143.             raise FileTooLargeError("This file is too large to be uploaded (offsets).")
  144. 
  145.         offset_data = struct.pack(">LLLLLLLLL",
  146.                                   1, # version number
  147.                                   block_size,
  148.                                   data_size,
  149.                                   offsets['data'],
  150.                                   offsets['plaintext_hash_tree'], # UNUSED
  151.                                   offsets['crypttext_hash_tree'],
  152.                                   offsets['block_hashes'],
  153.                                   offsets['share_hashes'],
  154.                                   offsets['uri_extension'],
  155.                                   )
  156.         assert len(offset_data) == 0x24
  157.         self._offset_data = offset_data
  158. 
  159.     def __repr__(self):
  160.         if self._nodeid:
  161.             nodeid_s = idlib.nodeid_b2a(self._nodeid)
  162.         else:
  163.             nodeid_s = "[None]"
  164.         return "<WriteBucketProxy for node %s>" % nodeid_s
  165. 
  166.     def put_header(self):
  167.         return self._write(0, self._offset_data)
  168. 
  169.     def put_block(self, segmentnum, data):
  170.         offset = self._offsets['data'] + segmentnum * self._block_size
  171.         assert offset + len(data) <= self._offsets['uri_extension']
  172.         assert isinstance(data, str)
  173.         if segmentnum < self._num_segments-1:
  174.             precondition(len(data) == self._block_size,
  175.                          len(data), self._block_size)
  176.         else:
  177.             precondition(len(data) == (self._data_size -
  178.                                        (self._block_size *
  179.                                         (self._num_segments - 1))),
  180.                          len(data), self._block_size)
  181.         return self._write(offset, data)
  182. 
  183.     def put_crypttext_hashes(self, hashes):
  184.         offset = self._offsets['crypttext_hash_tree']
  185.         assert isinstance(hashes, list)
  186.         data = "".join(hashes)
  187.         precondition(len(data) == self._segment_hash_size,
  188.                      len(data), self._segment_hash_size)
  189.         precondition(offset + len(data) <= self._offsets['block_hashes'],
  190.                      offset, len(data), offset+len(data),
  191.                      self._offsets['block_hashes'])
  192.         return self._write(offset, data)
  193. 
  194.     def put_block_hashes(self, blockhashes):
  195.         offset = self._offsets['block_hashes']
  196.         assert isinstance(blockhashes, list)
  197.         data = "".join(blockhashes)
  198.         precondition(len(data) == self._segment_hash_size,
  199.                      len(data), self._segment_hash_size)
  200.         precondition(offset + len(data) <= self._offsets['share_hashes'],
  201.                      offset, len(data), offset+len(data),
  202.                      self._offsets['share_hashes'])
  203.         return self._write(offset, data)
  204. 
  205.     def put_share_hashes(self, sharehashes):
  206.         # sharehashes is a list of (index, hash) tuples, so they get stored
  207.         # as 2+32=34 bytes each
  208.         offset = self._offsets['share_hashes']
  209.         assert isinstance(sharehashes, list)
  210.         data = "".join([struct.pack(">H", hashnum) + hashvalue
  211.                         for hashnum,hashvalue in sharehashes])
  212.         precondition(len(data) == self._share_hashtree_size,
  213.                      len(data), self._share_hashtree_size)
  214.         precondition(offset + len(data) <= self._offsets['uri_extension'],
  215.                      offset, len(data), offset+len(data),
  216.                      self._offsets['uri_extension'])
  217.         return self._write(offset, data)
  218. 
  219.     def put_uri_extension(self, data):
  220.         offset = self._offsets['uri_extension']
  221.         assert isinstance(data, str)
  222.         precondition(len(data) <= self._uri_extension_size_max,
  223.                      len(data), self._uri_extension_size_max)
  224.         length = struct.pack(self.fieldstruct, len(data))
  225.         return self._write(offset, length+data)
  226. 
  227.     def _write(self, offset, data):
  228.         # use a Pipeline to pipeline several writes together. TODO: another
  229.         # speedup would be to coalesce small writes into a single call: this
  230.         # would reduce the foolscap CPU overhead per share, but wouldn't
  231.         # reduce the number of round trips, so it might not be worth the
  232.         # effort.
  233. 
  234.         return self._pipeline.add(len(data),
  235.                                   self._rref.callRemote, "write", offset, data)
  236. 
  237.     def close(self):
  238.         d = self._pipeline.add(0, self._rref.callRemote, "close")
  239.         d.addCallback(lambda ign: self._pipeline.flush())
  240.         return d
  241. 
  242.     def abort(self):
  243.         return self._rref.callRemoteOnly("abort")
  244. 
  245. class WriteBucketProxy_v2(WriteBucketProxy):
  246.     fieldsize = 8
  247.     fieldstruct = ">Q"
  248. 
  249.     def _create_offsets(self, block_size, data_size):
  250.         if block_size >= 2**64 or data_size >= 2**64:
  251.             raise FileTooLargeError("This file is too large to be uploaded (data_size).")
  252. 
  253.         offsets = self._offsets = {}
  254.         x = 0x44
  255.         offsets['data'] = x
  256.         x += data_size
  257.         offsets['plaintext_hash_tree'] = x # UNUSED
  258.         x += self._segment_hash_size
  259.         offsets['crypttext_hash_tree'] = x
  260.         x += self._segment_hash_size
  261.         offsets['block_hashes'] = x
  262.         x += self._segment_hash_size
  263.         offsets['share_hashes'] = x
  264.         x += self._share_hashtree_size
  265.         offsets['uri_extension'] = x
  266. 
  267.         if x >= 2**64:
  268.             raise FileTooLargeError("This file is too large to be uploaded (offsets).")
  269. 
  270.         offset_data = struct.pack(">LQQQQQQQQ",
  271.                                   2, # version number
  272.                                   block_size,
  273.                                   data_size,
  274.                                   offsets['data'],
  275.                                   offsets['plaintext_hash_tree'], # UNUSED
  276.                                   offsets['crypttext_hash_tree'],
  277.                                   offsets['block_hashes'],
  278.                                   offsets['share_hashes'],
  279.                                   offsets['uri_extension'],
  280.                                   )
  281.         assert len(offset_data) == 0x44, len(offset_data)
  282.         self._offset_data = offset_data
  283. 
  284. class ReadBucketProxy:
  285.     implements(IStorageBucketReader)
  286. 
  287.     MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
  288. 
  289.     def __init__(self, rref, peerid, storage_index):
  290.         self._rref = rref
  291.         self._peerid = peerid
  292.         peer_id_s = idlib.shortnodeid_b2a(peerid)
  293.         storage_index_s = si_b2a(storage_index)
  294.         self._reprstr = "<ReadBucketProxy %s to peer [%s] SI %s>" % (id(self), peer_id_s, storage_index_s)
  295.         self._started = False # sent request to server
  296.         self._ready = observer.OneShotObserverList() # got response from server
  297. 
  298.     def get_peerid(self):
  299.         return self._peerid
  300. 
  301.     def __repr__(self):
  302.         return self._reprstr
  303. 
  304.     def _start_if_needed(self):
  305.         """ Returns a deferred that will be fired when I'm ready to return
  306.         data, or errbacks if the starting (header reading and parsing)
  307.         process fails."""
  308.         if not self._started:
  309.             self._start()
  310.         return self._ready.when_fired()
  311. 
  312.     def _start(self):
  313.         self._started = True
  314.         # TODO: for small shares, read the whole bucket in _start()
  315.         d = self._fetch_header()
  316.         d.addCallback(self._parse_offsets)
  317.         # XXX The following two callbacks implement a slightly faster/nicer
  318.         # way to get the ueb and sharehashtree, but it requires that the
  319.         # storage server be >= v1.3.0.
  320.         # d.addCallback(self._fetch_sharehashtree_and_ueb)
  321.         # d.addCallback(self._parse_sharehashtree_and_ueb)
  322.         def _fail_waiters(f):
  323.             self._ready.fire(f)
  324.         def _notify_waiters(result):
  325.             self._ready.fire(result)
  326.         d.addCallbacks(_notify_waiters, _fail_waiters)
  327.         return d
  328. 
  329.     def _fetch_header(self):
  330.         return self._read(0, 0x44)
  331. 
  332.     def _parse_offsets(self, data):
  333.         precondition(len(data) >= 0x4)
  334.         self._offsets = {}
  335.         (version,) = struct.unpack(">L", data[0:4])
  336.         if version != 1 and version != 2:
  337.             raise ShareVersionIncompatible(version)
  338. 
  339.         if version == 1:
  340.             precondition(len(data) >= 0x24)
  341.             x = 0x0c
  342.             fieldsize = 0x4
  343.             fieldstruct = ">L"
  344.         else:
  345.             precondition(len(data) >= 0x44)
  346.             x = 0x14
  347.             fieldsize = 0x8
  348.             fieldstruct = ">Q"
  349. 
  350.         self._version = version
  351.         self._fieldsize = fieldsize
  352.         self._fieldstruct = fieldstruct
  353. 
  354.         for field in ( 'data',
  355.                        'plaintext_hash_tree', # UNUSED
  356.                        'crypttext_hash_tree',
  357.                        'block_hashes',
  358.                        'share_hashes',
  359.                        'uri_extension',
  360.                        ):
  361.             offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
  362.             x += fieldsize
  363.             self._offsets[field] = offset
  364.         return self._offsets
  365. 
  366.     def _fetch_sharehashtree_and_ueb(self, offsets):
  367.         sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes']
  368.         return self._read(offsets['share_hashes'],
  369.                           self.MAX_UEB_SIZE+sharehashtree_size)
  370. 
  371.     def _parse_sharehashtree_and_ueb(self, data):
  372.         sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes']
  373.         if len(data) < sharehashtree_size:
  374.             raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data)))
  375.         if sharehashtree_size % (2+HASH_SIZE) != 0:
  376.             raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size))
  377.         self._share_hashes = []
  378.         for i in range(0, sharehashtree_size, 2+HASH_SIZE):
  379.             hashnum = struct.unpack(">H", data[i:i+2])[0]
  380.             hashvalue = data[i+2:i+2+HASH_SIZE]
  381.             self._share_hashes.append( (hashnum, hashvalue) )
  382. 
  383.         i = self._offsets['uri_extension']-self._offsets['share_hashes']
  384.         if len(data) < i+self._fieldsize:
  385.             raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),))
  386.         length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0]
  387.         self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length]
  388. 
  389.     def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
  390.         offset = self._offsets['data'] + blocknum * blocksize
  391.         return self._read(offset, thisblocksize)
  392. 
  393.     def get_block_data(self, blocknum, blocksize, thisblocksize):
  394.         d = self._start_if_needed()
  395.         d.addCallback(self._get_block_data, blocknum, blocksize, thisblocksize)
  396.         return d
  397. 
  398.     def _str2l(self, s):
  399.         """ split string (pulled from storage) into a list of blockids """
  400.         return [ s[i:i+HASH_SIZE]
  401.                  for i in range(0, len(s), HASH_SIZE) ]
  402. 
  403.     def _get_crypttext_hashes(self, unused=None):
  404.         offset = self._offsets['crypttext_hash_tree']
  405.         size = self._offsets['block_hashes'] - offset
  406.         d = self._read(offset, size)
  407.         d.addCallback(self._str2l)
  408.         return d
  409. 
  410.     def get_crypttext_hashes(self):
  411.         d = self._start_if_needed()
  412.         d.addCallback(self._get_crypttext_hashes)
  413.         return d
  414. 
  415.     def _get_block_hashes(self, unused=None, at_least_these=()):
  416.         # TODO: fetch only at_least_these instead of all of them.
  417.         offset = self._offsets['block_hashes']
  418.         size = self._offsets['share_hashes'] - offset
  419.         d = self._read(offset, size)
  420.         d.addCallback(self._str2l)
  421.         return d
  422. 
  423.     def get_block_hashes(self, at_least_these=()):
  424.         if at_least_these:
  425.             d = self._start_if_needed()
  426.             d.addCallback(self._get_block_hashes, at_least_these)
  427.             return d
  428.         else:
  429.             return defer.succeed([])
  430. 
  431.     def _get_share_hashes(self, unused=None):
  432.         if hasattr(self, '_share_hashes'):
  433.             return self._share_hashes
  434.         else:
  435.             return self._get_share_hashes_the_old_way()
  436.         return self._share_hashes
  437. 
  438.     def get_share_hashes(self):
  439.         d = self._start_if_needed()
  440.         d.addCallback(self._get_share_hashes)
  441.         return d
  442. 
  443.     def _get_share_hashes_the_old_way(self):
  444.         """ Tahoe storage servers < v1.3.0 would return an error if you tried
  445.         to read past the end of the share, so we need to use the offset and
  446.         read just that much."""
  447.         offset = self._offsets['share_hashes']
  448.         size = self._offsets['uri_extension'] - offset
  449.         if size % (2+HASH_SIZE) != 0:
  450.             raise LayoutInvalid("share hash tree corrupted -- should occupy a multiple of %d bytes, not %d bytes" % ((2+HASH_SIZE), size))
  451.         d = self._read(offset, size)
  452.         def _unpack_share_hashes(data):
  453.             if len(data) != size:
  454.                 raise LayoutInvalid("share hash tree corrupted -- got a short read of the share data -- should have gotten %d, not %d bytes" % (size, len(data)))
  455.             hashes = []
  456.             for i in range(0, size, 2+HASH_SIZE):
  457.                 hashnum = struct.unpack(">H", data[i:i+2])[0]
  458.                 hashvalue = data[i+2:i+2+HASH_SIZE]
  459.                 hashes.append( (hashnum, hashvalue) )
  460.             return hashes
  461.         d.addCallback(_unpack_share_hashes)
  462.         return d
  463. 
  464.     def _get_uri_extension_the_old_way(self, unused=None):
  465.         """ Tahoe storage servers < v1.3.0 would return an error if you tried
  466.         to read past the end of the share, so we need to fetch the UEB size
  467.         and then read just that much."""
  468.         offset = self._offsets['uri_extension']
  469.         d = self._read(offset, self._fieldsize)
  470.         def _got_length(data):
  471.             if len(data) != self._fieldsize:
  472.                 raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
  473.             length = struct.unpack(self._fieldstruct, data)[0]
  474.             if length >= 2**31:
  475.                 # URI extension blocks are around 419 bytes long, so this
  476.                 # must be corrupted. Anyway, the foolscap interface schema
  477.                 # for "read" will not allow >= 2**31 bytes length.
  478.                 raise RidiculouslyLargeURIExtensionBlock(length)
  479. 
  480.             return self._read(offset+self._fieldsize, length)
  481.         d.addCallback(_got_length)
  482.         return d
  483. 
  484.     def _get_uri_extension(self, unused=None):
  485.         if hasattr(self, '_ueb_data'):
  486.             return self._ueb_data
  487.         else:
  488.             return self._get_uri_extension_the_old_way()
  489. 
  490.     def get_uri_extension(self):
  491.         d = self._start_if_needed()
  492.         d.addCallback(self._get_uri_extension)
  493.         return d
  494. 
  495.     def _read(self, offset, length):
  496.         return self._rref.callRemote("read", offset, length)