source file: /home/buildslave/tahoe/edgy/build/src/allmydata/storage.py
file stats: 1017 lines, 999 executed: 98.2% covered
   1. import os, re, weakref, stat, struct, time
   2. from distutils.version import LooseVersion
   3. 
   4. from foolscap import Referenceable
   5. from twisted.application import service
   6. from twisted.internet import defer
   7. 
   8. from zope.interface import implements
   9. from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
  10.      RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
  11.      BadWriteEnablerError, IStatsProducer, FileTooLargeError
  12. from allmydata.util import base32, fileutil, idlib, mathutil, log
  13. from allmydata.util.assertutil import precondition, _assert
  14. import allmydata # for __version__
  15. 
  16. class DataTooLargeError(Exception):
  17.     pass
  18. 
  19. # storage/
  20. # storage/shares/incoming
  21. #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
  22. #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
  23. # storage/shares/$START/$STORAGEINDEX
  24. # storage/shares/$START/$STORAGEINDEX/$SHARENUM
  25. 
  26. # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
  27. # base-32 chars).
  28. 
  29. # $SHARENUM matches this regex:
  30. NUM_RE=re.compile("^[0-9]+$")
  31. 
  32. # each share file (in storage/shares/$SI/$SHNUM) contains lease information
  33. # and share data. The share data is accessed by RIBucketWriter.write and
  34. # RIBucketReader.read . The lease information is not accessible through these
  35. # interfaces.
  36. 
  37. # The share file has the following layout:
  38. #  0x00: share file version number, four bytes, current version is 1
  39. #  0x04: share data length, four bytes big-endian = A
  40. #  0x08: number of leases, four bytes big-endian
  41. #  0x0c: beginning of share data (described below, at WriteBucketProxy)
  42. #  A+0x0c = B: first lease. Lease format is:
  43. #   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
  44. #   B+0x04: renew secret, 32 bytes (SHA256)
  45. #   B+0x24: cancel secret, 32 bytes (SHA256)
  46. #   B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
  47. #   B+0x48: next lease, or end of record
  48. 
  49. def si_b2a(storageindex):
  50.     return base32.b2a(storageindex)
  51. 
  52. def si_a2b(ascii_storageindex):
  53.     return base32.a2b(ascii_storageindex)
  54. 
  55. def storage_index_to_dir(storageindex):
  56.     sia = si_b2a(storageindex)
  57.     return os.path.join(sia[:2], sia)
  58. 
  59. class LeaseInfo:
  60.     def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None,
  61.                  expiration_time=None, nodeid=None):
  62.         self.owner_num = owner_num
  63.         self.renew_secret = renew_secret
  64.         self.cancel_secret = cancel_secret
  65.         self.expiration_time = expiration_time
  66.         if nodeid is not None:
  67.             assert isinstance(nodeid, str)
  68.             assert len(nodeid) == 20
  69.         self.nodeid = nodeid
  70. 
  71.     def from_immutable_data(self, data):
  72.         (self.owner_num,
  73.          self.renew_secret,
  74.          self.cancel_secret,
  75.          self.expiration_time) = struct.unpack(">L32s32sL", data)
  76.         self.nodeid = None
  77.         return self
  78.     def to_immutable_data(self):
  79.         return struct.pack(">L32s32sL",
  80.                            self.owner_num,
  81.                            self.renew_secret, self.cancel_secret,
  82.                            int(self.expiration_time))
  83. 
  84.     def to_mutable_data(self):
  85.         return struct.pack(">LL32s32s20s",
  86.                            self.owner_num,
  87.                            int(self.expiration_time),
  88.                            self.renew_secret, self.cancel_secret,
  89.                            self.nodeid)
  90.     def from_mutable_data(self, data):
  91.         (self.owner_num,
  92.          self.expiration_time,
  93.          self.renew_secret, self.cancel_secret,
  94.          self.nodeid) = struct.unpack(">LL32s32s20s", data)
  95.         return self
  96. 
  97. 
  98. class ShareFile:
  99.     LEASE_SIZE = struct.calcsize(">L32s32sL")
 100. 
 101.     def __init__(self, filename):
 102.         self.home = filename
 103.         f = open(self.home, 'rb')
 104.         (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
 105.         assert version == 1
 106.         self._size = size
 107.         self._num_leases = num_leases
 108.         self._data_offset = 0xc
 109.         self._lease_offset = 0xc + self._size
 110. 
 111.     def unlink(self):
 112.         os.unlink(self.home)
 113. 
 114.     def read_share_data(self, offset, length):
 115.         precondition(offset >= 0)
 116.         precondition(offset+length <= self._size)
 117.         f = open(self.home, 'rb')
 118.         f.seek(self._data_offset+offset)
 119.         return f.read(length)
 120. 
 121.     def write_share_data(self, offset, data):
 122.         length = len(data)
 123.         precondition(offset >= 0)
 124.         precondition(offset+length <= self._size)
 125.         f = open(self.home, 'rb+')
 126.         real_offset = self._data_offset+offset
 127.         f.seek(real_offset)
 128.         assert f.tell() == real_offset
 129.         f.write(data)
 130.         f.close()
 131. 
 132.     def _write_lease_record(self, f, lease_number, lease_info):
 133.         offset = self._lease_offset + lease_number * self.LEASE_SIZE
 134.         f.seek(offset)
 135.         assert f.tell() == offset
 136.         f.write(lease_info.to_immutable_data())
 137. 
 138.     def _read_num_leases(self, f):
 139.         f.seek(0x08)
 140.         (num_leases,) = struct.unpack(">L", f.read(4))
 141.         return num_leases
 142. 
 143.     def _write_num_leases(self, f, num_leases):
 144.         f.seek(0x08)
 145.         f.write(struct.pack(">L", num_leases))
 146. 
 147.     def _truncate_leases(self, f, num_leases):
 148.         f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
 149. 
 150.     def iter_leases(self):
 151.         """Yields (ownernum, renew_secret, cancel_secret, expiration_time)
 152.         for all leases."""
 153.         f = open(self.home, 'rb')
 154.         (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
 155.         f.seek(self._lease_offset)
 156.         for i in range(num_leases):
 157.             data = f.read(self.LEASE_SIZE)
 158.             if data:
 159.                 yield LeaseInfo().from_immutable_data(data)
 160. 
 161.     def add_lease(self, lease_info):
 162.         f = open(self.home, 'rb+')
 163.         num_leases = self._read_num_leases(f)
 164.         self._write_lease_record(f, num_leases, lease_info)
 165.         self._write_num_leases(f, num_leases+1)
 166.         f.close()
 167. 
 168.     def renew_lease(self, renew_secret, new_expire_time):
 169.         for i,lease in enumerate(self.iter_leases()):
 170.             if lease.renew_secret == renew_secret:
 171.                 # yup. See if we need to update the owner time.
 172.                 if new_expire_time > lease.expiration_time:
 173.                     # yes
 174.                     lease.expiration_time = new_expire_time
 175.                     f = open(self.home, 'rb+')
 176.                     self._write_lease_record(f, i, lease)
 177.                     f.close()
 178.                 return
 179.         raise IndexError("unable to renew non-existent lease")
 180. 
 181.     def add_or_renew_lease(self, lease_info):
 182.         try:
 183.             self.renew_lease(lease_info.renew_secret,
 184.                              lease_info.expiration_time)
 185.         except IndexError:
 186.             self.add_lease(lease_info)
 187. 
 188. 
 189.     def cancel_lease(self, cancel_secret):
 190.         """Remove a lease with the given cancel_secret. If the last lease is
 191.         cancelled, the file will be removed. Return the number of bytes that
 192.         were freed (by truncating the list of leases, and possibly by
 193.         deleting the file. Raise IndexError if there was no lease with the
 194.         given cancel_secret.
 195.         """
 196. 
 197.         leases = list(self.iter_leases())
 198.         num_leases = len(leases)
 199.         num_leases_removed = 0
 200.         for i,lease in enumerate(leases[:]):
 201.             if lease.cancel_secret == cancel_secret:
 202.                 leases[i] = None
 203.                 num_leases_removed += 1
 204.         if not num_leases_removed:
 205.             raise IndexError("unable to find matching lease to cancel")
 206.         if num_leases_removed:
 207.             # pack and write out the remaining leases. We write these out in
 208.             # the same order as they were added, so that if we crash while
 209.             # doing this, we won't lose any non-cancelled leases.
 210.             leases = [l for l in leases if l] # remove the cancelled leases
 211.             f = open(self.home, 'rb+')
 212.             for i,lease in enumerate(leases):
 213.                 self._write_lease_record(f, i, lease)
 214.             self._write_num_leases(f, len(leases))
 215.             self._truncate_leases(f, len(leases))
 216.             f.close()
 217.         space_freed = self.LEASE_SIZE * num_leases_removed
 218.         if not len(leases):
 219.             space_freed += os.stat(self.home)[stat.ST_SIZE]
 220.             self.unlink()
 221.         return space_freed
 222. 
 223. 
 224. class BucketWriter(Referenceable):
 225.     implements(RIBucketWriter)
 226. 
 227.     def __init__(self, ss, incominghome, finalhome, size, lease_info, canary):
 228.         self.ss = ss
 229.         self.incominghome = incominghome
 230.         self.finalhome = finalhome
 231.         self._size = size
 232.         self._canary = canary
 233.         self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
 234.         self.closed = False
 235.         self.throw_out_all_data = False
 236.         # touch the file, so later callers will see that we're working on it.
 237.         assert not os.path.exists(incominghome)
 238.         fileutil.make_dirs(os.path.dirname(incominghome))
 239.         # Also construct the metadata.
 240.         f = open(incominghome, 'wb')
 241.         f.write(struct.pack(">LLL", 1, size, 0))
 242.         f.close()
 243.         self._sharefile = ShareFile(incominghome)
 244.         # also, add our lease to the file now, so that other ones can be
 245.         # added by simultaneous uploaders
 246.         self._sharefile.add_lease(lease_info)
 247. 
 248.     def allocated_size(self):
 249.         return self._size
 250. 
 251.     def remote_write(self, offset, data):
 252.         start = time.time()
 253.         precondition(not self.closed)
 254.         if self.throw_out_all_data:
 255.             return
 256.         self._sharefile.write_share_data(offset, data)
 257.         self.ss.add_latency("write", time.time() - start)
 258.         self.ss.count("write")
 259. 
 260.     def remote_close(self):
 261.         precondition(not self.closed)
 262.         start = time.time()
 263. 
 264.         fileutil.make_dirs(os.path.dirname(self.finalhome))
 265.         fileutil.rename(self.incominghome, self.finalhome)
 266.         try:
 267.             # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
 268.             # We try to delete the parent (.../ab/abcde) to avoid leaving
 269.             # these directories lying around forever, but the delete might
 270.             # fail if we're working on another share for the same storage
 271.             # index (like ab/abcde/5). The alternative approach would be to
 272.             # use a hierarchy of objects (PrefixHolder, BucketHolder,
 273.             # ShareWriter), each of which is responsible for a single
 274.             # directory on disk, and have them use reference counting of
 275.             # their children to know when they should do the rmdir. This
 276.             # approach is simpler, but relies on os.rmdir refusing to delete
 277.             # a non-empty directory. Do *not* use fileutil.rm_dir() here!
 278.             os.rmdir(os.path.dirname(self.incominghome))
 279.             # we also delete the grandparent (prefix) directory, .../ab ,
 280.             # again to avoid leaving directories lying around. This might
 281.             # fail if there is another bucket open that shares a prefix (like
 282.             # ab/abfff).
 283.             os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
 284.             # we leave the great-grandparent (incoming/) directory in place.
 285.         except EnvironmentError:
 286.             # ignore the "can't rmdir because the directory is not empty"
 287.             # exceptions, those are normal consequences of the
 288.             # above-mentioned conditions.
 289.             pass
 290.         self._sharefile = None
 291.         self.closed = True
 292.         self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
 293. 
 294.         filelen = os.stat(self.finalhome)[stat.ST_SIZE]
 295.         self.ss.bucket_writer_closed(self, filelen)
 296.         self.ss.add_latency("close", time.time() - start)
 297.         self.ss.count("close")
 298. 
 299.     def _disconnected(self):
 300.         if not self.closed:
 301.             self._abort()
 302. 
 303.     def remote_abort(self):
 304.         log.msg("storage: aborting sharefile %s" % self.incominghome,
 305.                 facility="tahoe.storage", level=log.UNUSUAL)
 306.         if not self.closed:
 307.             self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
 308.         self._abort()
 309.         self.ss.count("abort")
 310. 
 311.     def _abort(self):
 312.         if self.closed:
 313.             return
 314.         os.remove(self.incominghome)
 315.         # if we were the last share to be moved, remove the incoming/
 316.         # directory that was our parent
 317.         parentdir = os.path.split(self.incominghome)[0]
 318.         if not os.listdir(parentdir):
 319.             os.rmdir(parentdir)
 320. 
 321. 
 322. 
 323. class BucketReader(Referenceable):
 324.     implements(RIBucketReader)
 325. 
 326.     def __init__(self, ss, sharefname):
 327.         self.ss = ss
 328.         self._share_file = ShareFile(sharefname)
 329. 
 330.     def remote_read(self, offset, length):
 331.         start = time.time()
 332.         data = self._share_file.read_share_data(offset, length)
 333.         self.ss.add_latency("read", time.time() - start)
 334.         self.ss.count("read")
 335.         return data
 336. 
 337. 
 338. # the MutableShareFile is like the ShareFile, but used for mutable data. It
 339. # has a different layout. See docs/mutable.txt for more details.
 340. 
 341. # #   offset    size    name
 342. # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
 343. # 2   32        20      write enabler's nodeid
 344. # 3   52        32      write enabler
 345. # 4   84        8       data size (actual share data present) (a)
 346. # 5   92        8       offset of (8) count of extra leases (after data)
 347. # 6   100       368     four leases, 92 bytes each
 348. #                        0    4   ownerid (0 means "no lease here")
 349. #                        4    4   expiration timestamp
 350. #                        8   32   renewal token
 351. #                        40  32   cancel token
 352. #                        72  20   nodeid which accepted the tokens
 353. # 7   468       (a)     data
 354. # 8   ??        4       count of extra leases
 355. # 9   ??        n*92    extra leases
 356. 
 357. 
 358. assert struct.calcsize("L"), 4
 359. assert struct.calcsize("Q"), 8
 360. 
 361. class MutableShareFile:
 362. 
 363.     DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
 364.     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
 365.     HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
 366.     LEASE_SIZE = struct.calcsize(">LL32s32s20s")
 367.     assert LEASE_SIZE == 92
 368.     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
 369.     assert DATA_OFFSET == 468, DATA_OFFSET
 370.     # our sharefiles share with a recognizable string, plus some random
 371.     # binary data to reduce the chance that a regular text file will look
 372.     # like a sharefile.
 373.     MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
 374.     assert len(MAGIC) == 32
 375.     MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
 376.     # TODO: decide upon a policy for max share size
 377. 
 378.     def __init__(self, filename, parent=None):
 379.         self.home = filename
 380.         if os.path.exists(self.home):
 381.             # we don't cache anything, just check the magic
 382.             f = open(self.home, 'rb')
 383.             data = f.read(self.HEADER_SIZE)
 384.             (magic,
 385.              write_enabler_nodeid, write_enabler,
 386.              data_length, extra_least_offset) = \
 387.              struct.unpack(">32s20s32sQQ", data)
 388.             assert magic == self.MAGIC
 389.         self.parent = parent # for logging
 390. 
 391.     def log(self, *args, **kwargs):
 392.         return self.parent.log(*args, **kwargs)
 393. 
 394.     def create(self, my_nodeid, write_enabler):
 395.         assert not os.path.exists(self.home)
 396.         data_length = 0
 397.         extra_lease_offset = (self.HEADER_SIZE
 398.                               + 4 * self.LEASE_SIZE
 399.                               + data_length)
 400.         assert extra_lease_offset == self.DATA_OFFSET # true at creation
 401.         num_extra_leases = 0
 402.         f = open(self.home, 'wb')
 403.         header = struct.pack(">32s20s32sQQ",
 404.                              self.MAGIC, my_nodeid, write_enabler,
 405.                              data_length, extra_lease_offset,
 406.                              )
 407.         leases = ("\x00"*self.LEASE_SIZE) * 4
 408.         f.write(header + leases)
 409.         # data goes here, empty after creation
 410.         f.write(struct.pack(">L", num_extra_leases))
 411.         # extra leases go here, none at creation
 412.         f.close()
 413. 
 414.     def unlink(self):
 415.         os.unlink(self.home)
 416. 
 417.     def _read_data_length(self, f):
 418.         f.seek(self.DATA_LENGTH_OFFSET)
 419.         (data_length,) = struct.unpack(">Q", f.read(8))
 420.         return data_length
 421. 
 422.     def _write_data_length(self, f, data_length):
 423.         f.seek(self.DATA_LENGTH_OFFSET)
 424.         f.write(struct.pack(">Q", data_length))
 425. 
 426.     def _read_share_data(self, f, offset, length):
 427.         precondition(offset >= 0)
 428.         data_length = self._read_data_length(f)
 429.         if offset+length > data_length:
 430.             # reads beyond the end of the data are truncated. Reads that
 431.             # start beyond the end of the data return an empty string.
 432.             length = max(0, data_length-offset)
 433.         if length == 0:
 434.             return ""
 435.         precondition(offset+length <= data_length)
 436.         f.seek(self.DATA_OFFSET+offset)
 437.         data = f.read(length)
 438.         return data
 439. 
 440.     def _read_extra_lease_offset(self, f):
 441.         f.seek(self.EXTRA_LEASE_OFFSET)
 442.         (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
 443.         return extra_lease_offset
 444. 
 445.     def _write_extra_lease_offset(self, f, offset):
 446.         f.seek(self.EXTRA_LEASE_OFFSET)
 447.         f.write(struct.pack(">Q", offset))
 448. 
 449.     def _read_num_extra_leases(self, f):
 450.         offset = self._read_extra_lease_offset(f)
 451.         f.seek(offset)
 452.         (num_extra_leases,) = struct.unpack(">L", f.read(4))
 453.         return num_extra_leases
 454. 
 455.     def _write_num_extra_leases(self, f, num_leases):
 456.         extra_lease_offset = self._read_extra_lease_offset(f)
 457.         f.seek(extra_lease_offset)
 458.         f.write(struct.pack(">L", num_leases))
 459. 
 460.     def _change_container_size(self, f, new_container_size):
 461.         if new_container_size > self.MAX_SIZE:
 462.             raise DataTooLargeError()
 463.         old_extra_lease_offset = self._read_extra_lease_offset(f)
 464.         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
 465.         if new_extra_lease_offset < old_extra_lease_offset:
 466.             # TODO: allow containers to shrink. For now they remain large.
 467.             return
 468.         num_extra_leases = self._read_num_extra_leases(f)
 469.         f.seek(old_extra_lease_offset)
 470.         extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
 471.         f.seek(new_extra_lease_offset)
 472.         f.write(extra_lease_data)
 473.         # an interrupt here will corrupt the leases, iff the move caused the
 474.         # extra leases to overlap.
 475.         self._write_extra_lease_offset(f, new_extra_lease_offset)
 476. 
 477.     def _write_share_data(self, f, offset, data):
 478.         length = len(data)
 479.         precondition(offset >= 0)
 480.         data_length = self._read_data_length(f)
 481.         extra_lease_offset = self._read_extra_lease_offset(f)
 482. 
 483.         if offset+length >= data_length:
 484.             # They are expanding their data size.
 485.             if self.DATA_OFFSET+offset+length > extra_lease_offset:
 486.                 # Their new data won't fit in the current container, so we
 487.                 # have to move the leases. With luck, they're expanding it
 488.                 # more than the size of the extra lease block, which will
 489.                 # minimize the corrupt-the-share window
 490.                 self._change_container_size(f, offset+length)
 491.                 extra_lease_offset = self._read_extra_lease_offset(f)
 492. 
 493.                 # an interrupt here is ok.. the container has been enlarged
 494.                 # but the data remains untouched
 495. 
 496.             assert self.DATA_OFFSET+offset+length <= extra_lease_offset
 497.             # Their data now fits in the current container. We must write
 498.             # their new data and modify the recorded data size.
 499.             new_data_length = offset+length
 500.             self._write_data_length(f, new_data_length)
 501.             # an interrupt here will result in a corrupted share
 502. 
 503.         # now all that's left to do is write out their data
 504.         f.seek(self.DATA_OFFSET+offset)
 505.         f.write(data)
 506.         return
 507. 
 508.     def _write_lease_record(self, f, lease_number, lease_info):
 509.         extra_lease_offset = self._read_extra_lease_offset(f)
 510.         num_extra_leases = self._read_num_extra_leases(f)
 511.         if lease_number < 4:
 512.             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
 513.         elif (lease_number-4) < num_extra_leases:
 514.             offset = (extra_lease_offset
 515.                       + 4
 516.                       + (lease_number-4)*self.LEASE_SIZE)
 517.         else:
 518.             # must add an extra lease record
 519.             self._write_num_extra_leases(f, num_extra_leases+1)
 520.             offset = (extra_lease_offset
 521.                       + 4
 522.                       + (lease_number-4)*self.LEASE_SIZE)
 523.         f.seek(offset)
 524.         assert f.tell() == offset
 525.         f.write(lease_info.to_mutable_data())
 526. 
 527.     def _read_lease_record(self, f, lease_number):
 528.         # returns a LeaseInfo instance, or None
 529.         extra_lease_offset = self._read_extra_lease_offset(f)
 530.         num_extra_leases = self._read_num_extra_leases(f)
 531.         if lease_number < 4:
 532.             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
 533.         elif (lease_number-4) < num_extra_leases:
 534.             offset = (extra_lease_offset
 535.                       + 4
 536.                       + (lease_number-4)*self.LEASE_SIZE)
 537.         else:
 538.             raise IndexError("No such lease number %d" % lease_number)
 539.         f.seek(offset)
 540.         assert f.tell() == offset
 541.         data = f.read(self.LEASE_SIZE)
 542.         lease_info = LeaseInfo().from_mutable_data(data)
 543.         if lease_info.owner_num == 0:
 544.             return None
 545.         return lease_info
 546. 
 547.     def _get_num_lease_slots(self, f):
 548.         # how many places do we have allocated for leases? Not all of them
 549.         # are filled.
 550.         num_extra_leases = self._read_num_extra_leases(f)
 551.         return 4+num_extra_leases
 552. 
 553.     def _get_first_empty_lease_slot(self, f):
 554.         # return an int with the index of an empty slot, or None if we do not
 555.         # currently have an empty slot
 556. 
 557.         for i in range(self._get_num_lease_slots(f)):
 558.             if self._read_lease_record(f, i) is None:
 559.                 return i
 560.         return None
 561. 
 562.     def _enumerate_leases(self, f):
 563.         """Yields (leasenum, (ownerid, expiration_time, renew_secret,
 564.         cancel_secret, accepting_nodeid)) for all leases."""
 565.         for i in range(self._get_num_lease_slots(f)):
 566.             try:
 567.                 data = self._read_lease_record(f, i)
 568.                 if data is not None:
 569.                     yield (i,data)
 570.             except IndexError:
 571.                 return
 572. 
 573.     def debug_get_leases(self):
 574.         f = open(self.home, 'rb')
 575.         leases = list(self._enumerate_leases(f))
 576.         f.close()
 577.         return leases
 578. 
 579.     def add_lease(self, lease_info):
 580.         f = open(self.home, 'rb+')
 581.         num_lease_slots = self._get_num_lease_slots(f)
 582.         empty_slot = self._get_first_empty_lease_slot(f)
 583.         if empty_slot is not None:
 584.             self._write_lease_record(f, empty_slot, lease_info)
 585.         else:
 586.             self._write_lease_record(f, num_lease_slots, lease_info)
 587.         f.close()
 588. 
 589.     def renew_lease(self, renew_secret, new_expire_time):
 590.         accepting_nodeids = set()
 591.         f = open(self.home, 'rb+')
 592.         for (leasenum,lease) in self._enumerate_leases(f):
 593.             if lease.renew_secret == renew_secret:
 594.                 # yup. See if we need to update the owner time.
 595.                 if new_expire_time > lease.expiration_time:
 596.                     # yes
 597.                     lease.expiration_time = new_expire_time
 598.                     self._write_lease_record(f, leasenum, lease)
 599.                 f.close()
 600.                 return
 601.             accepting_nodeids.add(lease.nodeid)
 602.         f.close()
 603.         # Return the accepting_nodeids set, to give the client a chance to
 604.         # update the leases on a share which has been migrated from its
 605.         # original server to a new one.
 606.         msg = ("Unable to renew non-existent lease. I have leases accepted by"
 607.                " nodeids: ")
 608.         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
 609.                          for anid in accepting_nodeids])
 610.         msg += " ."
 611.         raise IndexError(msg)
 612. 
 613.     def add_or_renew_lease(self, lease_info):
 614.         try:
 615.             self.renew_lease(lease_info.renew_secret,
 616.                              lease_info.expiration_time)
 617.         except IndexError:
 618.             self.add_lease(lease_info)
 619. 
 620.     def cancel_lease(self, cancel_secret):
 621.         """Remove any leases with the given cancel_secret. If the last lease
 622.         is cancelled, the file will be removed. Return the number of bytes
 623.         that were freed (by truncating the list of leases, and possibly by
 624.         deleting the file. Raise IndexError if there was no lease with the
 625.         given cancel_secret."""
 626. 
 627.         accepting_nodeids = set()
 628.         modified = 0
 629.         remaining = 0
 630.         blank_lease = LeaseInfo(owner_num=0,
 631.                                 renew_secret="\x00"*32,
 632.                                 cancel_secret="\x00"*32,
 633.                                 expiration_time=0,
 634.                                 nodeid="\x00"*20)
 635.         f = open(self.home, 'rb+')
 636.         for (leasenum,lease) in self._enumerate_leases(f):
 637.             accepting_nodeids.add(lease.nodeid)
 638.             if lease.cancel_secret == cancel_secret:
 639.                 self._write_lease_record(f, leasenum, blank_lease)
 640.                 modified += 1
 641.             else:
 642.                 remaining += 1
 643.         if modified:
 644.             freed_space = self._pack_leases(f)
 645.             f.close()
 646.             if not remaining:
 647.                 freed_space += os.stat(self.home)[stat.ST_SIZE]
 648.                 self.unlink()
 649.             return freed_space
 650. 
 651.         msg = ("Unable to cancel non-existent lease. I have leases "
 652.                "accepted by nodeids: ")
 653.         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
 654.                          for anid in accepting_nodeids])
 655.         msg += " ."
 656.         raise IndexError(msg)
 657. 
 658.     def _pack_leases(self, f):
 659.         # TODO: reclaim space from cancelled leases
 660.         return 0
 661. 
 662.     def _read_write_enabler_and_nodeid(self, f):
 663.         f.seek(0)
 664.         data = f.read(self.HEADER_SIZE)
 665.         (magic,
 666.          write_enabler_nodeid, write_enabler,
 667.          data_length, extra_least_offset) = \
 668.          struct.unpack(">32s20s32sQQ", data)
 669.         assert magic == self.MAGIC
 670.         return (write_enabler, write_enabler_nodeid)
 671. 
 672.     def readv(self, readv):
 673.         datav = []
 674.         f = open(self.home, 'rb')
 675.         for (offset, length) in readv:
 676.             datav.append(self._read_share_data(f, offset, length))
 677.         f.close()
 678.         return datav
 679. 
 680. #    def remote_get_length(self):
 681. #        f = open(self.home, 'rb')
 682. #        data_length = self._read_data_length(f)
 683. #        f.close()
 684. #        return data_length
 685. 
 686.     def check_write_enabler(self, write_enabler, si_s):
 687.         f = open(self.home, 'rb+')
 688.         (real_write_enabler, write_enabler_nodeid) = \
 689.                              self._read_write_enabler_and_nodeid(f)
 690.         f.close()
 691.         if write_enabler != real_write_enabler:
 692.             # accomodate share migration by reporting the nodeid used for the
 693.             # old write enabler.
 694.             self.log(format="bad write enabler on SI %(si)s,"
 695.                      " recorded by nodeid %(nodeid)s",
 696.                      facility="tahoe.storage",
 697.                      level=log.WEIRD, umid="cE1eBQ",
 698.                      si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
 699.             msg = "The write enabler was recorded by nodeid '%s'." % \
 700.                   (idlib.nodeid_b2a(write_enabler_nodeid),)
 701.             raise BadWriteEnablerError(msg)
 702. 
 703.     def check_testv(self, testv):
 704.         test_good = True
 705.         f = open(self.home, 'rb+')
 706.         for (offset, length, operator, specimen) in testv:
 707.             data = self._read_share_data(f, offset, length)
 708.             if not testv_compare(data, operator, specimen):
 709.                 test_good = False
 710.                 break
 711.         f.close()
 712.         return test_good
 713. 
 714.     def writev(self, datav, new_length):
 715.         f = open(self.home, 'rb+')
 716.         for (offset, data) in datav:
 717.             self._write_share_data(f, offset, data)
 718.         if new_length is not None:
 719.             self._change_container_size(f, new_length)
 720.             f.seek(self.DATA_LENGTH_OFFSET)
 721.             f.write(struct.pack(">Q", new_length))
 722.         f.close()
 723. 
 724. def testv_compare(a, op, b):
 725.     assert op in ("lt", "le", "eq", "ne", "ge", "gt")
 726.     if op == "lt":
 727.         return a < b
 728.     if op == "le":
 729.         return a <= b
 730.     if op == "eq":
 731.         return a == b
 732.     if op == "ne":
 733.         return a != b
 734.     if op == "ge":
 735.         return a >= b
 736.     if op == "gt":
 737.         return a > b
 738.     # never reached
 739. 
 740. class EmptyShare:
 741. 
 742.     def check_testv(self, testv):
 743.         test_good = True
 744.         for (offset, length, operator, specimen) in testv:
 745.             data = ""
 746.             if not testv_compare(data, operator, specimen):
 747.                 test_good = False
 748.                 break
 749.         return test_good
 750. 
 751. def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
 752.     ms = MutableShareFile(filename, parent)
 753.     ms.create(my_nodeid, write_enabler)
 754.     del ms
 755.     return MutableShareFile(filename, parent)
 756. 
 757. 
 758. class StorageServer(service.MultiService, Referenceable):
 759.     implements(RIStorageServer, IStatsProducer)
 760.     name = 'storage'
 761. 
 762.     # This means that if a client treats me as though I were a 1.0.0 storage server, they will
 763.     # not be disappointed.
 764.     OLDEST_SUPPORTED_VERSION = LooseVersion("1.0.0")
 765. 
 766.     def __init__(self, storedir, sizelimit=None,
 767.                  discard_storage=False, readonly_storage=False,
 768.                  stats_provider=None):
 769.         service.MultiService.__init__(self)
 770.         self.storedir = storedir
 771.         sharedir = os.path.join(storedir, "shares")
 772.         fileutil.make_dirs(sharedir)
 773.         self.sharedir = sharedir
 774.         self.sizelimit = sizelimit
 775.         self.no_storage = discard_storage
 776.         self.readonly_storage = readonly_storage
 777.         self.stats_provider = stats_provider
 778.         if self.stats_provider:
 779.             self.stats_provider.register_producer(self)
 780.         self.incomingdir = os.path.join(sharedir, 'incoming')
 781.         self._clean_incomplete()
 782.         fileutil.make_dirs(self.incomingdir)
 783.         self._active_writers = weakref.WeakKeyDictionary()
 784.         lp = log.msg("StorageServer created, now measuring space..",
 785.                      facility="tahoe.storage")
 786.         self.consumed = None
 787.         if self.sizelimit:
 788.             self.consumed = fileutil.du(self.sharedir)
 789.             log.msg(format="space measurement done, consumed=%(consumed)d bytes",
 790.                     consumed=self.consumed,
 791.                     parent=lp, facility="tahoe.storage")
 792.         self.latencies = {"allocate": [], # immutable
 793.                           "write": [],
 794.                           "close": [],
 795.                           "read": [],
 796.                           "renew": [],
 797.                           "cancel": [],
 798.                           "get": [],
 799.                           "writev": [], # mutable
 800.                           "readv": [],
 801.                           }
 802. 
 803.     def count(self, name, delta=1):
 804.         if self.stats_provider:
 805.             self.stats_provider.count("storage_server." + name, delta)
 806. 
 807.     def add_latency(self, category, latency):
 808.         a = self.latencies[category]
 809.         a.append(latency)
 810.         if len(a) > 1000:
 811.             self.latencies[category] = a[-1000:]
 812. 
 813.     def get_latencies(self):
 814.         """Return a dict, indexed by category, that contains a dict of
 815.         latency numbers for each category. Each dict will contain the
 816.         following keys: mean, 01_0_percentile, 10_0_percentile,
 817.         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
 818.         99_0_percentile, 99_9_percentile. If no samples have been collected
 819.         for the given category, then that category name will not be present
 820.         in the return value."""
 821.         # note that Amazon's Dynamo paper says they use 99.9% percentile.
 822.         output = {}
 823.         for category in self.latencies:
 824.             if not self.latencies[category]:
 825.                 continue
 826.             stats = {}
 827.             samples = self.latencies[category][:]
 828.             samples.sort()
 829.             count = len(samples)
 830.             stats["mean"] = sum(samples) / count
 831.             stats["01_0_percentile"] = samples[int(0.01 * count)]
 832.             stats["10_0_percentile"] = samples[int(0.1 * count)]
 833.             stats["50_0_percentile"] = samples[int(0.5 * count)]
 834.             stats["90_0_percentile"] = samples[int(0.9 * count)]
 835.             stats["95_0_percentile"] = samples[int(0.95 * count)]
 836.             stats["99_0_percentile"] = samples[int(0.99 * count)]
 837.             stats["99_9_percentile"] = samples[int(0.999 * count)]
 838.             output[category] = stats
 839.         return output
 840. 
 841.     def log(self, *args, **kwargs):
 842.         if "facility" not in kwargs:
 843.             kwargs["facility"] = "tahoe.storage"
 844.         return log.msg(*args, **kwargs)
 845. 
 846.     def setNodeID(self, nodeid):
 847.         # somebody must set this before any slots can be created or leases
 848.         # added
 849.         self.my_nodeid = nodeid
 850. 
 851.     def startService(self):
 852.         service.MultiService.startService(self)
 853.         if self.parent:
 854.             nodeid = self.parent.nodeid # 20 bytes, binary
 855.             assert len(nodeid) == 20
 856.             self.setNodeID(nodeid)
 857. 
 858.     def _clean_incomplete(self):
 859.         fileutil.rm_dir(self.incomingdir)
 860. 
 861.     def get_stats(self):
 862.         stats = { 'storage_server.allocated': self.allocated_size(), }
 863.         if self.consumed is not None:
 864.             stats['storage_server.consumed'] = self.consumed
 865.         for category,ld in self.get_latencies().items():
 866.             for name,v in ld.items():
 867.                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
 868.         try:
 869.             s = os.statvfs(self.storedir)
 870.             disk_total = s.f_bsize * s.f_blocks
 871.             disk_used = s.f_bsize * (s.f_blocks - s.f_bfree)
 872.             # spacetime predictors should look at the slope of disk_used.
 873.             disk_avail = s.f_bsize * s.f_bavail # available to non-root users
 874.             # TODO: include our local policy here: if we stop accepting
 875.             # shares when the available space drops below 1GB, then include
 876.             # that fact in disk_avail.
 877.             #
 878.             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
 879.             stats["storage_server.disk_total"] = disk_total
 880.             stats["storage_server.disk_used"] = disk_used
 881.             stats["storage_server.disk_avail"] = disk_avail
 882.         except AttributeError:
 883.             # os.statvfs is only available on unix
 884.             pass
 885.         return stats
 886. 
 887.     def allocated_size(self):
 888.         space = self.consumed or 0
 889.         for bw in self._active_writers:
 890.             space += bw.allocated_size()
 891.         return space
 892. 
 893.     def remote_get_versions(self):
 894.         return (str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION))
 895. 
 896.     def remote_allocate_buckets(self, storage_index,
 897.                                 renew_secret, cancel_secret,
 898.                                 sharenums, allocated_size,
 899.                                 canary, owner_num=0):
 900.         # owner_num is not for clients to set, but rather it should be
 901.         # curried into the PersonalStorageServer instance that is dedicated
 902.         # to a particular owner.
 903.         start = time.time()
 904.         self.count("allocate")
 905.         alreadygot = set()
 906.         bucketwriters = {} # k: shnum, v: BucketWriter
 907.         si_dir = storage_index_to_dir(storage_index)
 908.         si_s = si_b2a(storage_index)
 909. 
 910.         log.msg("storage: allocate_buckets %s" % si_s)
 911. 
 912.         # in this implementation, the lease information (including secrets)
 913.         # goes into the share files themselves. It could also be put into a
 914.         # separate database. Note that the lease should not be added until
 915.         # the BucketWrite has been closed.
 916.         expire_time = time.time() + 31*24*60*60
 917.         lease_info = LeaseInfo(owner_num,
 918.                                renew_secret, cancel_secret,
 919.                                expire_time, self.my_nodeid)
 920. 
 921.         space_per_bucket = allocated_size
 922.         no_limits = self.sizelimit is None
 923.         yes_limits = not no_limits
 924.         if yes_limits:
 925.             remaining_space = self.sizelimit - self.allocated_size()
 926. 
 927.         # fill alreadygot with all shares that we have, not just the ones
 928.         # they asked about: this will save them a lot of work. Add or update
 929.         # leases for all of them: if they want us to hold shares for this
 930.         # file, they'll want us to hold leases for this file.
 931.         for (shnum, fn) in self._get_bucket_shares(storage_index):
 932.             alreadygot.add(shnum)
 933.             sf = ShareFile(fn)
 934.             sf.add_or_renew_lease(lease_info)
 935. 
 936.         if self.readonly_storage:
 937.             # we won't accept new shares
 938.             self.add_latency("allocate", time.time() - start)
 939.             return alreadygot, bucketwriters
 940. 
 941.         for shnum in sharenums:
 942.             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
 943.             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
 944.             if os.path.exists(finalhome):
 945.                 # great! we already have it. easy.
 946.                 pass
 947.             elif os.path.exists(incominghome):
 948.                 # Note that we don't create BucketWriters for shnums that
 949.                 # have a partial share (in incoming/), so if a second upload
 950.                 # occurs while the first is still in progress, the second
 951.                 # uploader will use different storage servers.
 952.                 pass
 953.             elif no_limits or remaining_space >= space_per_bucket:
 954.                 # ok! we need to create the new share file.
 955.                 bw = BucketWriter(self, incominghome, finalhome,
 956.                                   space_per_bucket, lease_info, canary)
 957.                 if self.no_storage:
 958.                     bw.throw_out_all_data = True
 959.                 bucketwriters[shnum] = bw
 960.                 self._active_writers[bw] = 1
 961.                 if yes_limits:
 962.                     remaining_space -= space_per_bucket
 963.             else:
 964.                 # bummer! not enough space to accept this bucket
 965.                 pass
 966. 
 967.         if bucketwriters:
 968.             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
 969. 
 970.         self.add_latency("allocate", time.time() - start)
 971.         return alreadygot, bucketwriters
 972. 
 973.     def _iter_share_files(self, storage_index):
 974.         for shnum, filename in self._get_bucket_shares(storage_index):
 975.             f = open(filename, 'rb')
 976.             header = f.read(32)
 977.             f.close()
 978.             if header[:32] == MutableShareFile.MAGIC:
 979.                 sf = MutableShareFile(filename, self)
 980.                 # note: if the share has been migrated, the renew_lease()
 981.                 # call will throw an exception, with information to help the
 982.                 # client update the lease.
 983.             elif header[:4] == struct.pack(">L", 1):
 984.                 sf = ShareFile(filename)
 985.             else:
 986.                 continue # non-sharefile
 987.             yield sf
 988. 
 989.     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
 990.                          owner_num=0):
 991.         start = time.time()
 992.         self.count("add-lease")
 993.         new_expire_time = time.time() + 31*24*60*60
 994.         lease_info = LeaseInfo(owner_num,
 995.                                renew_secret, cancel_secret,
 996.                                new_expire_time, self.my_nodeid)
 997.         found_buckets = False
 998.         for sf in self._iter_share_files(storage_index):
 999.             found_buckets = True
1000.             # note: if the share has been migrated, the renew_lease()
1001.             # call will throw an exception, with information to help the
1002.             # client update the lease.
1003.             sf.add_or_renew_lease(lease_info)
1004.         self.add_latency("add-lease", time.time() - start)
1005.         if not found_buckets:
1006.             raise IndexError("no such storage index to do add-lease")
1007. 
1008. 
1009.     def remote_renew_lease(self, storage_index, renew_secret):
1010.         start = time.time()
1011.         self.count("renew")
1012.         new_expire_time = time.time() + 31*24*60*60
1013.         found_buckets = False
1014.         for sf in self._iter_share_files(storage_index):
1015.             found_buckets = True
1016.             sf.renew_lease(renew_secret, new_expire_time)
1017.         self.add_latency("renew", time.time() - start)
1018.         if not found_buckets:
1019.             raise IndexError("no such lease to renew")
1020. 
1021.     def remote_cancel_lease(self, storage_index, cancel_secret):
1022.         start = time.time()
1023.         self.count("cancel")
1024. 
1025.         total_space_freed = 0
1026.         found_buckets = False
1027.         for sf in self._iter_share_files(storage_index):
1028.             # note: if we can't find a lease on one share, we won't bother
1029.             # looking in the others. Unless something broke internally
1030.             # (perhaps we ran out of disk space while adding a lease), the
1031.             # leases on all shares will be identical.
1032.             found_buckets = True
1033.             # this raises IndexError if the lease wasn't present XXXX
1034.             total_space_freed += sf.cancel_lease(cancel_secret)
1035. 
1036.         if found_buckets:
1037.             storagedir = os.path.join(self.sharedir,
1038.                                       storage_index_to_dir(storage_index))
1039.             if not os.listdir(storagedir):
1040.                 os.rmdir(storagedir)
1041. 
1042.         if self.consumed is not None:
1043.             self.consumed -= total_space_freed
1044.         if self.stats_provider:
1045.             self.stats_provider.count('storage_server.bytes_freed',
1046.                                       total_space_freed)
1047.         self.add_latency("cancel", time.time() - start)
1048.         if not found_buckets:
1049.             raise IndexError("no such storage index")
1050. 
1051.     def bucket_writer_closed(self, bw, consumed_size):
1052.         if self.consumed is not None:
1053.             self.consumed += consumed_size
1054.         if self.stats_provider:
1055.             self.stats_provider.count('storage_server.bytes_added', consumed_size)
1056.         del self._active_writers[bw]
1057. 
1058.     def _get_bucket_shares(self, storage_index):
1059.         """Return a list of (shnum, pathname) tuples for files that hold
1060.         shares for this storage_index. In each tuple, 'shnum' will always be
1061.         the integer form of the last component of 'pathname'."""
1062.         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
1063.         try:
1064.             for f in os.listdir(storagedir):
1065.                 if NUM_RE.match(f):
1066.                     filename = os.path.join(storagedir, f)
1067.                     yield (int(f), filename)
1068.         except OSError:
1069.             # Commonly caused by there being no buckets at all.
1070.             pass
1071. 
1072.     def remote_get_buckets(self, storage_index):
1073.         start = time.time()
1074.         self.count("get")
1075.         si_s = si_b2a(storage_index)
1076.         log.msg("storage: get_buckets %s" % si_s)
1077.         bucketreaders = {} # k: sharenum, v: BucketReader
1078.         for shnum, filename in self._get_bucket_shares(storage_index):
1079.             bucketreaders[shnum] = BucketReader(self, filename)
1080.         self.add_latency("get", time.time() - start)
1081.         return bucketreaders
1082. 
1083.     def get_leases(self, storage_index):
1084.         """Provide an iterator that yields all of the leases attached to this
1085.         bucket. Each lease is returned as a tuple of (owner_num,
1086.         renew_secret, cancel_secret, expiration_time).
1087. 
1088.         This method is not for client use.
1089.         """
1090. 
1091.         # since all shares get the same lease data, we just grab the leases
1092.         # from the first share
1093.         try:
1094.             shnum, filename = self._get_bucket_shares(storage_index).next()
1095.             sf = ShareFile(filename)
1096.             return sf.iter_leases()
1097.         except StopIteration:
1098.             return iter([])
1099. 
1100.     def remote_slot_testv_and_readv_and_writev(self, storage_index,
1101.                                                secrets,
1102.                                                test_and_write_vectors,
1103.                                                read_vector):
1104.         start = time.time()
1105.         self.count("writev")
1106.         si_s = si_b2a(storage_index)
1107.         lp = log.msg("storage: slot_writev %s" % si_s)
1108.         si_dir = storage_index_to_dir(storage_index)
1109.         (write_enabler, renew_secret, cancel_secret) = secrets
1110.         # shares exist if there is a file for them
1111.         bucketdir = os.path.join(self.sharedir, si_dir)
1112.         shares = {}
1113.         if os.path.isdir(bucketdir):
1114.             for sharenum_s in os.listdir(bucketdir):
1115.                 try:
1116.                     sharenum = int(sharenum_s)
1117.                 except ValueError:
1118.                     continue
1119.                 filename = os.path.join(bucketdir, sharenum_s)
1120.                 msf = MutableShareFile(filename, self)
1121.                 msf.check_write_enabler(write_enabler, si_s)
1122.                 shares[sharenum] = msf
1123.         # write_enabler is good for all existing shares.
1124. 
1125.         # Now evaluate test vectors.
1126.         testv_is_good = True
1127.         for sharenum in test_and_write_vectors:
1128.             (testv, datav, new_length) = test_and_write_vectors[sharenum]
1129.             if sharenum in shares:
1130.                 if not shares[sharenum].check_testv(testv):
1131.                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
1132.                     testv_is_good = False
1133.                     break
1134.             else:
1135.                 # compare the vectors against an empty share, in which all
1136.                 # reads return empty strings.
1137.                 if not EmptyShare().check_testv(testv):
1138.                     self.log("testv failed (empty): [%d] %r" % (sharenum,
1139.                                                                 testv))
1140.                     testv_is_good = False
1141.                     break
1142. 
1143.         # now gather the read vectors, before we do any writes
1144.         read_data = {}
1145.         for sharenum, share in shares.items():
1146.             read_data[sharenum] = share.readv(read_vector)
1147. 
1148.         if testv_is_good:
1149.             # now apply the write vectors
1150.             for sharenum in test_and_write_vectors:
1151.                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
1152.                 if sharenum not in shares:
1153.                     # allocate a new share
1154.                     allocated_size = 2000 # arbitrary, really
1155.                     share = self._allocate_slot_share(bucketdir, secrets,
1156.                                                       sharenum,
1157.                                                       allocated_size,
1158.                                                       owner_num=0)
1159.                     shares[sharenum] = share
1160.                 shares[sharenum].writev(datav, new_length)
1161.             # and update the leases on all shares
1162.             ownerid = 1 # TODO
1163.             expire_time = time.time() + 31*24*60*60   # one month
1164.             lease_info = LeaseInfo(ownerid,
1165.                                    renew_secret, cancel_secret,
1166.                                    expire_time, self.my_nodeid)
1167.             for share in shares.values():
1168.                 share.add_or_renew_lease(lease_info)
1169. 
1170.         # all done
1171.         self.add_latency("writev", time.time() - start)
1172.         return (testv_is_good, read_data)
1173. 
1174.     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
1175.                              allocated_size, owner_num=0):
1176.         (write_enabler, renew_secret, cancel_secret) = secrets
1177.         my_nodeid = self.my_nodeid
1178.         fileutil.make_dirs(bucketdir)
1179.         filename = os.path.join(bucketdir, "%d" % sharenum)
1180.         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
1181.                                          self)
1182.         return share
1183. 
1184.     def remote_slot_readv(self, storage_index, shares, readv):
1185.         start = time.time()
1186.         self.count("readv")
1187.         si_s = si_b2a(storage_index)
1188.         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
1189.                      facility="tahoe.storage", level=log.OPERATIONAL)
1190.         si_dir = storage_index_to_dir(storage_index)
1191.         # shares exist if there is a file for them
1192.         bucketdir = os.path.join(self.sharedir, si_dir)
1193.         if not os.path.isdir(bucketdir):
1194.             self.add_latency("readv", time.time() - start)
1195.             return {}
1196.         datavs = {}
1197.         for sharenum_s in os.listdir(bucketdir):
1198.             try:
1199.                 sharenum = int(sharenum_s)
1200.             except ValueError:
1201.                 continue
1202.             if sharenum in shares or not shares:
1203.                 filename = os.path.join(bucketdir, sharenum_s)
1204.                 msf = MutableShareFile(filename, self)
1205.                 datavs[sharenum] = msf.readv(readv)
1206.         log.msg("returning shares %s" % (datavs.keys(),),
1207.                 facility="tahoe.storage", level=log.NOISY, parent=lp)
1208.         self.add_latency("readv", time.time() - start)
1209.         return datavs
1210. 
1211. 
1212. # the code before here runs on the storage server, not the client
1213. # the code beyond here runs on the client, not the storage server
1214. 
1215. """
1216. Share data is written into a single file. At the start of the file, there is
1217. a series of four-byte big-endian offset values, which indicate where each
1218. section starts. Each offset is measured from the beginning of the file.
1219. 
1220. 0x00: version number (=00 00 00 01)
1221. 0x04: segment size
1222. 0x08: data size
1223. 0x0c: offset of data (=00 00 00 24)
1224. 0x10: offset of plaintext_hash_tree
1225. 0x14: offset of crypttext_hash_tree
1226. 0x18: offset of block_hashes
1227. 0x1c: offset of share_hashes
1228. 0x20: offset of uri_extension_length + uri_extension
1229. 0x24: start of data
1230. ?   : start of plaintext_hash_tree
1231. ?   : start of crypttext_hash_tree
1232. ?   : start of block_hashes
1233. ?   : start of share_hashes
1234.        each share_hash is written as a two-byte (big-endian) hashnum
1235.        followed by the 32-byte SHA-256 hash. We only store the hashes
1236.        necessary to validate the share hash root
1237. ?   : start of uri_extension_length (four-byte big-endian value)
1238. ?   : start of uri_extension
1239. """
1240. 
1241. def allocated_size(data_size, num_segments, num_share_hashes,
1242.                    uri_extension_size):
1243.     wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
1244.                            uri_extension_size, None)
1245.     uri_extension_starts_at = wbp._offsets['uri_extension']
1246.     return uri_extension_starts_at + 4 + uri_extension_size
1247. 
1248. class WriteBucketProxy:
1249.     implements(IStorageBucketWriter)
1250.     def __init__(self, rref, data_size, segment_size, num_segments,
1251.                  num_share_hashes, uri_extension_size, nodeid):
1252.         self._rref = rref
1253.         self._data_size = data_size
1254.         self._segment_size = segment_size
1255.         self._num_segments = num_segments
1256.         self._nodeid = nodeid
1257. 
1258.         if segment_size >= 2**32 or data_size >= 2**32:
1259.             raise FileTooLargeError("This file is too large to be uploaded (data_size).")
1260. 
1261.         effective_segments = mathutil.next_power_of_k(num_segments,2)
1262.         self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
1263.         # how many share hashes are included in each share? This will be
1264.         # about ln2(num_shares).
1265.         self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
1266.         # we commit to not sending a uri extension larger than this
1267.         self._uri_extension_size = uri_extension_size
1268. 
1269.         offsets = self._offsets = {}
1270.         x = 0x24
1271.         offsets['data'] = x
1272.         x += data_size
1273.         offsets['plaintext_hash_tree'] = x
1274.         x += self._segment_hash_size
1275.         offsets['crypttext_hash_tree'] = x
1276.         x += self._segment_hash_size
1277.         offsets['block_hashes'] = x
1278.         x += self._segment_hash_size
1279.         offsets['share_hashes'] = x
1280.         x += self._share_hash_size
1281.         offsets['uri_extension'] = x
1282. 
1283.         if x >= 2**32:
1284.             raise FileTooLargeError("This file is too large to be uploaded (offsets).")
1285. 
1286.         offset_data = struct.pack(">LLLLLLLLL",
1287.                                   1, # version number
1288.                                   segment_size,
1289.                                   data_size,
1290.                                   offsets['data'],
1291.                                   offsets['plaintext_hash_tree'],
1292.                                   offsets['crypttext_hash_tree'],
1293.                                   offsets['block_hashes'],
1294.                                   offsets['share_hashes'],
1295.                                   offsets['uri_extension'],
1296.                                   )
1297.         assert len(offset_data) == 0x24
1298.         self._offset_data = offset_data
1299. 
1300.     def __repr__(self):
1301.         if self._nodeid:
1302.             nodeid_s = idlib.nodeid_b2a(self._nodeid)
1303.         else:
1304.             nodeid_s = "[None]"
1305.         return "<allmydata.storage.WriteBucketProxy for node %s>" % nodeid_s
1306. 
1307.     def start(self):
1308.         return self._write(0, self._offset_data)
1309. 
1310.     def put_block(self, segmentnum, data):
1311.         offset = self._offsets['data'] + segmentnum * self._segment_size
1312.         assert offset + len(data) <= self._offsets['uri_extension']
1313.         assert isinstance(data, str)
1314.         if segmentnum < self._num_segments-1:
1315.             precondition(len(data) == self._segment_size,
1316.                          len(data), self._segment_size)
1317.         else:
1318.             precondition(len(data) == (self._data_size -
1319.                                        (self._segment_size *
1320.                                         (self._num_segments - 1))),
1321.                          len(data), self._segment_size)
1322.         return self._write(offset, data)
1323. 
1324.     def put_plaintext_hashes(self, hashes):
1325.         offset = self._offsets['plaintext_hash_tree']
1326.         assert isinstance(hashes, list)
1327.         data = "".join(hashes)
1328.         precondition(len(data) == self._segment_hash_size,
1329.                      len(data), self._segment_hash_size)
1330.         precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'],
1331.                      offset, len(data), offset+len(data),
1332.                      self._offsets['crypttext_hash_tree'])
1333.         return self._write(offset, data)
1334. 
1335.     def put_crypttext_hashes(self, hashes):
1336.         offset = self._offsets['crypttext_hash_tree']
1337.         assert isinstance(hashes, list)
1338.         data = "".join(hashes)
1339.         precondition(len(data) == self._segment_hash_size,
1340.                      len(data), self._segment_hash_size)
1341.         precondition(offset + len(data) <= self._offsets['block_hashes'],
1342.                      offset, len(data), offset+len(data),
1343.                      self._offsets['block_hashes'])
1344.         return self._write(offset, data)
1345. 
1346.     def put_block_hashes(self, blockhashes):
1347.         offset = self._offsets['block_hashes']
1348.         assert isinstance(blockhashes, list)
1349.         data = "".join(blockhashes)
1350.         precondition(len(data) == self._segment_hash_size,
1351.                      len(data), self._segment_hash_size)
1352.         precondition(offset + len(data) <= self._offsets['share_hashes'],
1353.                      offset, len(data), offset+len(data),
1354.                      self._offsets['share_hashes'])
1355.         return self._write(offset, data)
1356. 
1357.     def put_share_hashes(self, sharehashes):
1358.         # sharehashes is a list of (index, hash) tuples, so they get stored
1359.         # as 2+32=34 bytes each
1360.         offset = self._offsets['share_hashes']
1361.         assert isinstance(sharehashes, list)
1362.         data = "".join([struct.pack(">H", hashnum) + hashvalue
1363.                         for hashnum,hashvalue in sharehashes])
1364.         precondition(len(data) == self._share_hash_size,
1365.                      len(data), self._share_hash_size)
1366.         precondition(offset + len(data) <= self._offsets['uri_extension'],
1367.                      offset, len(data), offset+len(data),
1368.                      self._offsets['uri_extension'])
1369.         return self._write(offset, data)
1370. 
1371.     def put_uri_extension(self, data):
1372.         offset = self._offsets['uri_extension']
1373.         assert isinstance(data, str)
1374.         precondition(len(data) <= self._uri_extension_size,
1375.                      len(data), self._uri_extension_size)
1376.         length = struct.pack(">L", len(data))
1377.         return self._write(offset, length+data)
1378. 
1379.     def _write(self, offset, data):
1380.         # TODO: for small shares, buffer the writes and do just a single call
1381.         return self._rref.callRemote("write", offset, data)
1382. 
1383.     def close(self):
1384.         return self._rref.callRemote("close")
1385. 
1386.     def abort(self):
1387.         return self._rref.callRemoteOnly("abort")
1388. 
1389. class ReadBucketProxy:
1390.     implements(IStorageBucketReader)
1391.     def __init__(self, rref, peerid=None, storage_index_s=None):
1392.         self._rref = rref
1393.         self._peerid = peerid
1394.         self._si_s = storage_index_s
1395.         self._started = False
1396. 
1397.     def get_peerid(self):
1398.         return self._peerid
1399. 
1400.     def __repr__(self):
1401.         peerid_s = idlib.shortnodeid_b2a(self._peerid)
1402.         return "<ReadBucketProxy to peer [%s] SI %s>" % (peerid_s,
1403.                                                          self._si_s)
1404. 
1405.     def startIfNecessary(self):
1406.         if self._started:
1407.             return defer.succeed(self)
1408.         d = self.start()
1409.         d.addCallback(lambda res: self)
1410.         return d
1411. 
1412.     def start(self):
1413.         # TODO: for small shares, read the whole bucket in start()
1414.         d = self._read(0, 0x24)
1415.         d.addCallback(self._parse_offsets)
1416.         def _started(res):
1417.             self._started = True
1418.             return res
1419.         d.addCallback(_started)
1420.         return d
1421. 
1422.     def _parse_offsets(self, data):
1423.         precondition(len(data) == 0x24)
1424.         self._offsets = {}
1425.         (version, self._segment_size, self._data_size) = \
1426.                   struct.unpack(">LLL", data[0:0xc])
1427.         _assert(version == 1)
1428.         x = 0x0c
1429.         for field in ( 'data',
1430.                        'plaintext_hash_tree',
1431.                        'crypttext_hash_tree',
1432.                        'block_hashes',
1433.                        'share_hashes',
1434.                        'uri_extension',
1435.                        ):
1436.             offset = struct.unpack(">L", data[x:x+4])[0]
1437.             x += 4
1438.             self._offsets[field] = offset
1439.         return self._offsets
1440. 
1441.     def get_block(self, blocknum):
1442.         num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
1443.         if blocknum < num_segments-1:
1444.             size = self._segment_size
1445.         else:
1446.             size = self._data_size % self._segment_size
1447.             if size == 0:
1448.                 size = self._segment_size
1449.         offset = self._offsets['data'] + blocknum * self._segment_size
1450.         return self._read(offset, size)
1451. 
1452.     def _str2l(self, s):
1453.         """ split string (pulled from storage) into a list of blockids """
1454.         return [ s[i:i+HASH_SIZE]
1455.                  for i in range(0, len(s), HASH_SIZE) ]
1456. 
1457.     def get_plaintext_hashes(self):
1458.         offset = self._offsets['plaintext_hash_tree']
1459.         size = self._offsets['crypttext_hash_tree'] - offset
1460.         d = self._read(offset, size)
1461.         d.addCallback(self._str2l)
1462.         return d
1463. 
1464.     def get_crypttext_hashes(self):
1465.         offset = self._offsets['crypttext_hash_tree']
1466.         size = self._offsets['block_hashes'] - offset
1467.         d = self._read(offset, size)
1468.         d.addCallback(self._str2l)
1469.         return d
1470. 
1471.     def get_block_hashes(self):
1472.         offset = self._offsets['block_hashes']
1473.         size = self._offsets['share_hashes'] - offset
1474.         d = self._read(offset, size)
1475.         d.addCallback(self._str2l)
1476.         return d
1477. 
1478.     def get_share_hashes(self):
1479.         offset = self._offsets['share_hashes']
1480.         size = self._offsets['uri_extension'] - offset
1481.         assert size % (2+HASH_SIZE) == 0
1482.         d = self._read(offset, size)
1483.         def _unpack_share_hashes(data):
1484.             assert len(data) == size
1485.             hashes = []
1486.             for i in range(0, size, 2+HASH_SIZE):
1487.                 hashnum = struct.unpack(">H", data[i:i+2])[0]
1488.                 hashvalue = data[i+2:i+2+HASH_SIZE]
1489.                 hashes.append( (hashnum, hashvalue) )
1490.             return hashes
1491.         d.addCallback(_unpack_share_hashes)
1492.         return d
1493. 
1494.     def get_uri_extension(self):
1495.         offset = self._offsets['uri_extension']
1496.         d = self._read(offset, 4)
1497.         def _got_length(data):
1498.             length = struct.unpack(">L", data)[0]
1499.             return self._read(offset+4, length)
1500.         d.addCallback(_got_length)
1501.         return d
1502. 
1503.     def _read(self, offset, length):
1504.         return self._rref.callRemote("read", offset, length)