source file: /home/buildslave/tahoe/edgy/build/src/allmydata/storage/mutable.py
file stats: 291 lines, 288 executed: 99.0% covered
coverage versus previous test: 0 lines added, 0 lines removed
    1. import os, stat, struct
    2. 
    3. from allmydata.interfaces import BadWriteEnablerError
    4. from allmydata.util import idlib, log
    5. from allmydata.util.assertutil import precondition
    6. from allmydata.util.hashutil import constant_time_compare
    7. from allmydata.storage.lease import LeaseInfo
    8. from allmydata.storage.common import UnknownMutableContainerVersionError, \
    9.      DataTooLargeError
   10. 
   11. # the MutableShareFile is like the ShareFile, but used for mutable data. It
   12. # has a different layout. See docs/mutable.txt for more details.
   13. 
   14. # #   offset    size    name
   15. # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
   16. # 2   32        20      write enabler's nodeid
   17. # 3   52        32      write enabler
   18. # 4   84        8       data size (actual share data present) (a)
   19. # 5   92        8       offset of (8) count of extra leases (after data)
   20. # 6   100       368     four leases, 92 bytes each
   21. #                        0    4   ownerid (0 means "no lease here")
   22. #                        4    4   expiration timestamp
   23. #                        8   32   renewal token
   24. #                        40  32   cancel token
   25. #                        72  20   nodeid which accepted the tokens
   26. # 7   468       (a)     data
   27. # 8   ??        4       count of extra leases
   28. # 9   ??        n*92    extra leases
   29. 
   30. 
   31. assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
   32. assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
   33. 
   34. class MutableShareFile:
   35. 
   36.     sharetype = "mutable"
   37.     DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
   38.     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
   39.     HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
   40.     LEASE_SIZE = struct.calcsize(">LL32s32s20s")
   41.     assert LEASE_SIZE == 92
   42.     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
   43.     assert DATA_OFFSET == 468, DATA_OFFSET
   44.     # our sharefiles share with a recognizable string, plus some random
   45.     # binary data to reduce the chance that a regular text file will look
   46.     # like a sharefile.
   47.     MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
   48.     assert len(MAGIC) == 32
   49.     MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
   50.     # TODO: decide upon a policy for max share size
   51. 
   52.     def __init__(self, filename, parent=None):
   53.         self.home = filename
   54.         if os.path.exists(self.home):
   55.             # we don't cache anything, just check the magic
   56.             f = open(self.home, 'rb')
   57.             data = f.read(self.HEADER_SIZE)
   58.             (magic,
   59.              write_enabler_nodeid, write_enabler,
   60.              data_length, extra_least_offset) = \
   61.              struct.unpack(">32s20s32sQQ", data)
   62.             if magic != self.MAGIC:
   63.                 msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
   64.                       (filename, magic, self.MAGIC)
   65.                 raise UnknownMutableContainerVersionError(msg)
   66.         self.parent = parent # for logging
   67. 
   68.     def log(self, *args, **kwargs):
   69.         return self.parent.log(*args, **kwargs)
   70. 
   71.     def create(self, my_nodeid, write_enabler):
   72.         assert not os.path.exists(self.home)
   73.         data_length = 0
   74.         extra_lease_offset = (self.HEADER_SIZE
   75.                               + 4 * self.LEASE_SIZE
   76.                               + data_length)
   77.         assert extra_lease_offset == self.DATA_OFFSET # true at creation
   78.         num_extra_leases = 0
   79.         f = open(self.home, 'wb')
   80.         header = struct.pack(">32s20s32sQQ",
   81.                              self.MAGIC, my_nodeid, write_enabler,
   82.                              data_length, extra_lease_offset,
   83.                              )
   84.         leases = ("\x00"*self.LEASE_SIZE) * 4
   85.         f.write(header + leases)
   86.         # data goes here, empty after creation
   87.         f.write(struct.pack(">L", num_extra_leases))
   88.         # extra leases go here, none at creation
   89.         f.close()
   90. 
   91.     def unlink(self):
   92.         os.unlink(self.home)
   93. 
   94.     def _read_data_length(self, f):
   95.         f.seek(self.DATA_LENGTH_OFFSET)
   96.         (data_length,) = struct.unpack(">Q", f.read(8))
   97.         return data_length
   98. 
   99.     def _write_data_length(self, f, data_length):
  100.         f.seek(self.DATA_LENGTH_OFFSET)
  101.         f.write(struct.pack(">Q", data_length))
  102. 
  103.     def _read_share_data(self, f, offset, length):
  104.         precondition(offset >= 0)
  105.         data_length = self._read_data_length(f)
  106.         if offset+length > data_length:
  107.             # reads beyond the end of the data are truncated. Reads that
  108.             # start beyond the end of the data return an empty string.
  109.             length = max(0, data_length-offset)
  110.         if length == 0:
  111.             return ""
  112.         precondition(offset+length <= data_length)
  113.         f.seek(self.DATA_OFFSET+offset)
  114.         data = f.read(length)
  115.         return data
  116. 
  117.     def _read_extra_lease_offset(self, f):
  118.         f.seek(self.EXTRA_LEASE_OFFSET)
  119.         (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
  120.         return extra_lease_offset
  121. 
  122.     def _write_extra_lease_offset(self, f, offset):
  123.         f.seek(self.EXTRA_LEASE_OFFSET)
  124.         f.write(struct.pack(">Q", offset))
  125. 
  126.     def _read_num_extra_leases(self, f):
  127.         offset = self._read_extra_lease_offset(f)
  128.         f.seek(offset)
  129.         (num_extra_leases,) = struct.unpack(">L", f.read(4))
  130.         return num_extra_leases
  131. 
  132.     def _write_num_extra_leases(self, f, num_leases):
  133.         extra_lease_offset = self._read_extra_lease_offset(f)
  134.         f.seek(extra_lease_offset)
  135.         f.write(struct.pack(">L", num_leases))
  136. 
  137.     def _change_container_size(self, f, new_container_size):
  138.         if new_container_size > self.MAX_SIZE:
  139.             raise DataTooLargeError()
  140.         old_extra_lease_offset = self._read_extra_lease_offset(f)
  141.         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
  142.         if new_extra_lease_offset < old_extra_lease_offset:
  143.             # TODO: allow containers to shrink. For now they remain large.
  144.             return
  145.         num_extra_leases = self._read_num_extra_leases(f)
  146.         f.seek(old_extra_lease_offset)
  147.         extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
  148.         f.seek(new_extra_lease_offset)
  149.         f.write(extra_lease_data)
  150.         # an interrupt here will corrupt the leases, iff the move caused the
  151.         # extra leases to overlap.
  152.         self._write_extra_lease_offset(f, new_extra_lease_offset)
  153. 
  154.     def _write_share_data(self, f, offset, data):
  155.         length = len(data)
  156.         precondition(offset >= 0)
  157.         data_length = self._read_data_length(f)
  158.         extra_lease_offset = self._read_extra_lease_offset(f)
  159. 
  160.         if offset+length >= data_length:
  161.             # They are expanding their data size.
  162.             if self.DATA_OFFSET+offset+length > extra_lease_offset:
  163.                 # Their new data won't fit in the current container, so we
  164.                 # have to move the leases. With luck, they're expanding it
  165.                 # more than the size of the extra lease block, which will
  166.                 # minimize the corrupt-the-share window
  167.                 self._change_container_size(f, offset+length)
  168.                 extra_lease_offset = self._read_extra_lease_offset(f)
  169. 
  170.                 # an interrupt here is ok.. the container has been enlarged
  171.                 # but the data remains untouched
  172. 
  173.             assert self.DATA_OFFSET+offset+length <= extra_lease_offset
  174.             # Their data now fits in the current container. We must write
  175.             # their new data and modify the recorded data size.
  176.             new_data_length = offset+length
  177.             self._write_data_length(f, new_data_length)
  178.             # an interrupt here will result in a corrupted share
  179. 
  180.         # now all that's left to do is write out their data
  181.         f.seek(self.DATA_OFFSET+offset)
  182.         f.write(data)
  183.         return
  184. 
  185.     def _write_lease_record(self, f, lease_number, lease_info):
  186.         extra_lease_offset = self._read_extra_lease_offset(f)
  187.         num_extra_leases = self._read_num_extra_leases(f)
  188.         if lease_number < 4:
  189.             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
  190.         elif (lease_number-4) < num_extra_leases:
  191.             offset = (extra_lease_offset
  192.                       + 4
  193.                       + (lease_number-4)*self.LEASE_SIZE)
  194.         else:
  195.             # must add an extra lease record
  196.             self._write_num_extra_leases(f, num_extra_leases+1)
  197.             offset = (extra_lease_offset
  198.                       + 4
  199.                       + (lease_number-4)*self.LEASE_SIZE)
  200.         f.seek(offset)
  201.         assert f.tell() == offset
  202.         f.write(lease_info.to_mutable_data())
  203. 
  204.     def _read_lease_record(self, f, lease_number):
  205.         # returns a LeaseInfo instance, or None
  206.         extra_lease_offset = self._read_extra_lease_offset(f)
  207.         num_extra_leases = self._read_num_extra_leases(f)
  208.         if lease_number < 4:
  209.             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
  210.         elif (lease_number-4) < num_extra_leases:
  211.             offset = (extra_lease_offset
  212.                       + 4
  213.                       + (lease_number-4)*self.LEASE_SIZE)
  214.         else:
  215.             raise IndexError("No such lease number %d" % lease_number)
  216.         f.seek(offset)
  217.         assert f.tell() == offset
  218.         data = f.read(self.LEASE_SIZE)
  219.         lease_info = LeaseInfo().from_mutable_data(data)
  220.         if lease_info.owner_num == 0:
  221.             return None
  222.         return lease_info
  223. 
  224.     def _get_num_lease_slots(self, f):
  225.         # how many places do we have allocated for leases? Not all of them
  226.         # are filled.
  227.         num_extra_leases = self._read_num_extra_leases(f)
  228.         return 4+num_extra_leases
  229. 
  230.     def _get_first_empty_lease_slot(self, f):
  231.         # return an int with the index of an empty slot, or None if we do not
  232.         # currently have an empty slot
  233. 
  234.         for i in range(self._get_num_lease_slots(f)):
  235.             if self._read_lease_record(f, i) is None:
  236.                 return i
  237.         return None
  238. 
  239.     def get_leases(self):
  240.         """Yields a LeaseInfo instance for all leases."""
  241.         f = open(self.home, 'rb')
  242.         for i, lease in self._enumerate_leases(f):
  243.             yield lease
  244.         f.close()
  245. 
  246.     def _enumerate_leases(self, f):
  247.         for i in range(self._get_num_lease_slots(f)):
  248.             try:
  249.                 data = self._read_lease_record(f, i)
  250.                 if data is not None:
  251.                     yield i,data
  252.             except IndexError:
  253.                 return
  254. 
  255.     def add_lease(self, lease_info):
  256.         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
  257.         f = open(self.home, 'rb+')
  258.         num_lease_slots = self._get_num_lease_slots(f)
  259.         empty_slot = self._get_first_empty_lease_slot(f)
  260.         if empty_slot is not None:
  261.             self._write_lease_record(f, empty_slot, lease_info)
  262.         else:
  263.             self._write_lease_record(f, num_lease_slots, lease_info)
  264.         f.close()
  265. 
  266.     def renew_lease(self, renew_secret, new_expire_time):
  267.         accepting_nodeids = set()
  268.         f = open(self.home, 'rb+')
  269.         for (leasenum,lease) in self._enumerate_leases(f):
  270.             if constant_time_compare(lease.renew_secret, renew_secret):
  271.                 # yup. See if we need to update the owner time.
  272.                 if new_expire_time > lease.expiration_time:
  273.                     # yes
  274.                     lease.expiration_time = new_expire_time
  275.                     self._write_lease_record(f, leasenum, lease)
  276.                 f.close()
  277.                 return
  278.             accepting_nodeids.add(lease.nodeid)
  279.         f.close()
  280.         # Return the accepting_nodeids set, to give the client a chance to
  281.         # update the leases on a share which has been migrated from its
  282.         # original server to a new one.
  283.         msg = ("Unable to renew non-existent lease. I have leases accepted by"
  284.                " nodeids: ")
  285.         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
  286.                          for anid in accepting_nodeids])
  287.         msg += " ."
  288.         raise IndexError(msg)
  289. 
  290.     def add_or_renew_lease(self, lease_info):
  291.         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
  292.         try:
  293.             self.renew_lease(lease_info.renew_secret,
  294.                              lease_info.expiration_time)
  295.         except IndexError:
  296.             self.add_lease(lease_info)
  297. 
  298.     def cancel_lease(self, cancel_secret):
  299.         """Remove any leases with the given cancel_secret. If the last lease
  300.         is cancelled, the file will be removed. Return the number of bytes
  301.         that were freed (by truncating the list of leases, and possibly by
  302.         deleting the file. Raise IndexError if there was no lease with the
  303.         given cancel_secret."""
  304. 
  305.         accepting_nodeids = set()
  306.         modified = 0
  307.         remaining = 0
  308.         blank_lease = LeaseInfo(owner_num=0,
  309.                                 renew_secret="\x00"*32,
  310.                                 cancel_secret="\x00"*32,
  311.                                 expiration_time=0,
  312.                                 nodeid="\x00"*20)
  313.         f = open(self.home, 'rb+')
  314.         for (leasenum,lease) in self._enumerate_leases(f):
  315.             accepting_nodeids.add(lease.nodeid)
  316.             if constant_time_compare(lease.cancel_secret, cancel_secret):
  317.                 self._write_lease_record(f, leasenum, blank_lease)
  318.                 modified += 1
  319.             else:
  320.                 remaining += 1
  321.         if modified:
  322.             freed_space = self._pack_leases(f)
  323.             f.close()
  324.             if not remaining:
  325.                 freed_space += os.stat(self.home)[stat.ST_SIZE]
  326.                 self.unlink()
  327.             return freed_space
  328. 
  329.         msg = ("Unable to cancel non-existent lease. I have leases "
  330.                "accepted by nodeids: ")
  331.         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
  332.                          for anid in accepting_nodeids])
  333.         msg += " ."
  334.         raise IndexError(msg)
  335. 
  336.     def _pack_leases(self, f):
  337.         # TODO: reclaim space from cancelled leases
  338.         return 0
  339. 
  340.     def _read_write_enabler_and_nodeid(self, f):
  341.         f.seek(0)
  342.         data = f.read(self.HEADER_SIZE)
  343.         (magic,
  344.          write_enabler_nodeid, write_enabler,
  345.          data_length, extra_least_offset) = \
  346.          struct.unpack(">32s20s32sQQ", data)
  347.         assert magic == self.MAGIC
  348.         return (write_enabler, write_enabler_nodeid)
  349. 
  350.     def readv(self, readv):
  351.         datav = []
  352.         f = open(self.home, 'rb')
  353.         for (offset, length) in readv:
  354.             datav.append(self._read_share_data(f, offset, length))
  355.         f.close()
  356.         return datav
  357. 
  358. #    def remote_get_length(self):
  359. #        f = open(self.home, 'rb')
  360. #        data_length = self._read_data_length(f)
  361. #        f.close()
  362. #        return data_length
  363. 
  364.     def check_write_enabler(self, write_enabler, si_s):
  365.         f = open(self.home, 'rb+')
  366.         (real_write_enabler, write_enabler_nodeid) = \
  367.                              self._read_write_enabler_and_nodeid(f)
  368.         f.close()
  369.         # avoid a timing attack
  370.         #if write_enabler != real_write_enabler:
  371.         if not constant_time_compare(write_enabler, real_write_enabler):
  372.             # accomodate share migration by reporting the nodeid used for the
  373.             # old write enabler.
  374.             self.log(format="bad write enabler on SI %(si)s,"
  375.                      " recorded by nodeid %(nodeid)s",
  376.                      facility="tahoe.storage",
  377.                      level=log.WEIRD, umid="cE1eBQ",
  378.                      si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
  379.             msg = "The write enabler was recorded by nodeid '%s'." % \
  380.                   (idlib.nodeid_b2a(write_enabler_nodeid),)
  381.             raise BadWriteEnablerError(msg)
  382. 
  383.     def check_testv(self, testv):
  384.         test_good = True
  385.         f = open(self.home, 'rb+')
  386.         for (offset, length, operator, specimen) in testv:
  387.             data = self._read_share_data(f, offset, length)
  388.             if not testv_compare(data, operator, specimen):
  389.                 test_good = False
  390.                 break
  391.         f.close()
  392.         return test_good
  393. 
  394.     def writev(self, datav, new_length):
  395.         f = open(self.home, 'rb+')
  396.         for (offset, data) in datav:
  397.             self._write_share_data(f, offset, data)
  398.         if new_length is not None:
  399.             self._change_container_size(f, new_length)
  400.             f.seek(self.DATA_LENGTH_OFFSET)
  401.             f.write(struct.pack(">Q", new_length))
  402.         f.close()
  403. 
  404. def testv_compare(a, op, b):
  405.     assert op in ("lt", "le", "eq", "ne", "ge", "gt")
  406.     if op == "lt":
  407.         return a < b
  408.     if op == "le":
  409.         return a <= b
  410.     if op == "eq":
  411.         return a == b
  412.     if op == "ne":
  413.         return a != b
  414.     if op == "ge":
  415.         return a >= b
  416.     if op == "gt":
  417.         return a > b
  418.     # never reached
  419. 
  420. class EmptyShare:
  421. 
  422.     def check_testv(self, testv):
  423.         test_good = True
  424.         for (offset, length, operator, specimen) in testv:
  425.             data = ""
  426.             if not testv_compare(data, operator, specimen):
  427.                 test_good = False
  428.                 break
  429.         return test_good
  430. 
  431. def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
  432.     ms = MutableShareFile(filename, parent)
  433.     ms.create(my_nodeid, write_enabler)
  434.     del ms
  435.     return MutableShareFile(filename, parent)
  436.