source file: /home/buildslave/tahoe/edgy/build/src/allmydata/mutable/publish.py
file stats: 442 lines, 440 executed: 99.5% covered
   1. 
   2. 
   3. import os, struct, time
   4. from itertools import count
   5. from zope.interface import implements
   6. from twisted.internet import defer
   7. from twisted.python import failure
   8. from allmydata.interfaces import IPublishStatus, FileTooLargeError
   9. from allmydata.util import base32, hashutil, mathutil, idlib, log
  10. from allmydata import hashtree, codec, storage
  11. from pycryptopp.cipher.aes import AES
  12. from foolscap.eventual import eventually
  13. 
  14. from common import MODE_WRITE, MODE_CHECK, DictOfSets, \
  15.      UncoordinatedWriteError, NotEnoughServersError
  16. from servermap import ServerMap
  17. from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
  18.      unpack_checkstring, SIGNED_PREFIX
  19. 
  20. class PublishStatus:
  21.     implements(IPublishStatus)
  22.     statusid_counter = count(0)
  23.     def __init__(self):
  24.         self.timings = {}
  25.         self.timings["send_per_server"] = {}
  26.         self.servermap = None
  27.         self.problems = {}
  28.         self.active = True
  29.         self.storage_index = None
  30.         self.helper = False
  31.         self.encoding = ("?", "?")
  32.         self.size = None
  33.         self.status = "Not started"
  34.         self.progress = 0.0
  35.         self.counter = self.statusid_counter.next()
  36.         self.started = time.time()
  37. 
  38.     def add_per_server_time(self, peerid, elapsed):
  39.         if peerid not in self.timings["send_per_server"]:
  40.             self.timings["send_per_server"][peerid] = []
  41.         self.timings["send_per_server"][peerid].append(elapsed)
  42. 
  43.     def get_started(self):
  44.         return self.started
  45.     def get_storage_index(self):
  46.         return self.storage_index
  47.     def get_encoding(self):
  48.         return self.encoding
  49.     def using_helper(self):
  50.         return self.helper
  51.     def get_servermap(self):
  52.         return self.servermap
  53.     def get_size(self):
  54.         return self.size
  55.     def get_status(self):
  56.         return self.status
  57.     def get_progress(self):
  58.         return self.progress
  59.     def get_active(self):
  60.         return self.active
  61.     def get_counter(self):
  62.         return self.counter
  63. 
  64.     def set_storage_index(self, si):
  65.         self.storage_index = si
  66.     def set_helper(self, helper):
  67.         self.helper = helper
  68.     def set_servermap(self, servermap):
  69.         self.servermap = servermap
  70.     def set_encoding(self, k, n):
  71.         self.encoding = (k, n)
  72.     def set_size(self, size):
  73.         self.size = size
  74.     def set_status(self, status):
  75.         self.status = status
  76.     def set_progress(self, value):
  77.         self.progress = value
  78.     def set_active(self, value):
  79.         self.active = value
  80. 
  81. class Publish:
  82.     """I represent a single act of publishing the mutable file to the grid. I
  83.     will only publish my data if the servermap I am using still represents
  84.     the current state of the world.
  85. 
  86.     To make the initial publish, set servermap to None.
  87.     """
  88. 
  89.     # we limit the segment size as usual to constrain our memory footprint.
  90.     # The max segsize is higher for mutable files, because we want to support
  91.     # dirnodes with up to 10k children, and each child uses about 330 bytes.
  92.     # If you actually put that much into a directory you'll be using a
  93.     # footprint of around 14MB, which is higher than we'd like, but it is
  94.     # more important right now to support large directories than to make
  95.     # memory usage small when you use them. Once we implement MDMF (with
  96.     # multiple segments), we will drop this back down, probably to 128KiB.
  97.     MAX_SEGMENT_SIZE = 3500000
  98. 
  99.     def __init__(self, filenode, servermap):
 100.         self._node = filenode
 101.         self._servermap = servermap
 102.         self._storage_index = self._node.get_storage_index()
 103.         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
 104.         num = self._node._client.log("Publish(%s): starting" % prefix)
 105.         self._log_number = num
 106.         self._running = True
 107. 
 108.         self._status = PublishStatus()
 109.         self._status.set_storage_index(self._storage_index)
 110.         self._status.set_helper(False)
 111.         self._status.set_progress(0.0)
 112.         self._status.set_active(True)
 113. 
 114.     def get_status(self):
 115.         return self._status
 116. 
 117.     def log(self, *args, **kwargs):
 118.         if 'parent' not in kwargs:
 119.             kwargs['parent'] = self._log_number
 120.         if "facility" not in kwargs:
 121.             kwargs["facility"] = "tahoe.mutable.publish"
 122.         return log.msg(*args, **kwargs)
 123. 
 124.     def publish(self, newdata):
 125.         """Publish the filenode's current contents.  Returns a Deferred that
 126.         fires (with None) when the publish has done as much work as it's ever
 127.         going to do, or errbacks with ConsistencyError if it detects a
 128.         simultaneous write.
 129.         """
 130. 
 131.         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
 132.         # 2: perform peer selection, get candidate servers
 133.         #  2a: send queries to n+epsilon servers, to determine current shares
 134.         #  2b: based upon responses, create target map
 135.         # 3: send slot_testv_and_readv_and_writev messages
 136.         # 4: as responses return, update share-dispatch table
 137.         # 4a: may need to run recovery algorithm
 138.         # 5: when enough responses are back, we're done
 139. 
 140.         self.log("starting publish, datalen is %s" % len(newdata))
 141.         if len(newdata) > self.MAX_SEGMENT_SIZE:
 142.             raise FileTooLargeError("SDMF is limited to one segment, and "
 143.                                     "%d > %d" % (len(newdata),
 144.                                                  self.MAX_SEGMENT_SIZE))
 145.         self._status.set_size(len(newdata))
 146.         self._status.set_status("Started")
 147.         self._started = time.time()
 148. 
 149.         self.done_deferred = defer.Deferred()
 150. 
 151.         self._writekey = self._node.get_writekey()
 152.         assert self._writekey, "need write capability to publish"
 153. 
 154.         # first, which servers will we publish to? We require that the
 155.         # servermap was updated in MODE_WRITE, so we can depend upon the
 156.         # peerlist computed by that process instead of computing our own.
 157.         if self._servermap:
 158.             assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
 159.             # we will push a version that is one larger than anything present
 160.             # in the grid, according to the servermap.
 161.             self._new_seqnum = self._servermap.highest_seqnum() + 1
 162.         else:
 163.             # If we don't have a servermap, that's because we're doing the
 164.             # initial publish
 165.             self._new_seqnum = 1
 166.             self._servermap = ServerMap()
 167.         self._status.set_servermap(self._servermap)
 168. 
 169.         self.log(format="new seqnum will be %(seqnum)d",
 170.                  seqnum=self._new_seqnum, level=log.NOISY)
 171. 
 172.         # having an up-to-date servermap (or using a filenode that was just
 173.         # created for the first time) also guarantees that the following
 174.         # fields are available
 175.         self.readkey = self._node.get_readkey()
 176.         self.required_shares = self._node.get_required_shares()
 177.         assert self.required_shares is not None
 178.         self.total_shares = self._node.get_total_shares()
 179.         assert self.total_shares is not None
 180.         self._status.set_encoding(self.required_shares, self.total_shares)
 181. 
 182.         self._pubkey = self._node.get_pubkey()
 183.         assert self._pubkey
 184.         self._privkey = self._node.get_privkey()
 185.         assert self._privkey
 186.         self._encprivkey = self._node.get_encprivkey()
 187. 
 188.         client = self._node._client
 189.         full_peerlist = client.get_permuted_peers("storage",
 190.                                                   self._storage_index)
 191.         self.full_peerlist = full_peerlist # for use later, immutable
 192.         self.bad_peers = set() # peerids who have errbacked/refused requests
 193. 
 194.         self.newdata = newdata
 195.         self.salt = os.urandom(16)
 196. 
 197.         self.setup_encoding_parameters()
 198. 
 199.         # if we experience any surprises (writes which were rejected because
 200.         # our test vector did not match, or shares which we didn't expect to
 201.         # see), we set this flag and report an UncoordinatedWriteError at the
 202.         # end of the publish process.
 203.         self.surprised = False
 204. 
 205.         # as a failsafe, refuse to iterate through self.loop more than a
 206.         # thousand times.
 207.         self.looplimit = 1000
 208. 
 209.         # we keep track of three tables. The first is our goal: which share
 210.         # we want to see on which servers. This is initially populated by the
 211.         # existing servermap.
 212.         self.goal = set() # pairs of (peerid, shnum) tuples
 213. 
 214.         # the second table is our list of outstanding queries: those which
 215.         # are in flight and may or may not be delivered, accepted, or
 216.         # acknowledged. Items are added to this table when the request is
 217.         # sent, and removed when the response returns (or errbacks).
 218.         self.outstanding = set() # (peerid, shnum) tuples
 219. 
 220.         # the third is a table of successes: share which have actually been
 221.         # placed. These are populated when responses come back with success.
 222.         # When self.placed == self.goal, we're done.
 223.         self.placed = set() # (peerid, shnum) tuples
 224. 
 225.         # we also keep a mapping from peerid to RemoteReference. Each time we
 226.         # pull a connection out of the full peerlist, we add it to this for
 227.         # use later.
 228.         self.connections = {}
 229. 
 230.         self.bad_share_checkstrings = {}
 231. 
 232.         # we use the servermap to populate the initial goal: this way we will
 233.         # try to update each existing share in place.
 234.         for (peerid, shnum) in self._servermap.servermap:
 235.             self.goal.add( (peerid, shnum) )
 236.             self.connections[peerid] = self._servermap.connections[peerid]
 237.         # then we add in all the shares that were bad (corrupted, bad
 238.         # signatures, etc). We want to replace these.
 239.         for key, old_checkstring in self._servermap.bad_shares.items():
 240.             (peerid, shnum) = key
 241.             self.goal.add(key)
 242.             self.bad_share_checkstrings[key] = old_checkstring
 243.             self.connections[peerid] = self._servermap.connections[peerid]
 244. 
 245.         # create the shares. We'll discard these as they are delivered. SDMF:
 246.         # we're allowed to hold everything in memory.
 247. 
 248.         self._status.timings["setup"] = time.time() - self._started
 249.         d = self._encrypt_and_encode()
 250.         d.addCallback(self._generate_shares)
 251.         def _start_pushing(res):
 252.             self._started_pushing = time.time()
 253.             return res
 254.         d.addCallback(_start_pushing)
 255.         d.addCallback(self.loop) # trigger delivery
 256.         d.addErrback(self._fatal_error)
 257. 
 258.         return self.done_deferred
 259. 
 260.     def setup_encoding_parameters(self):
 261.         segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
 262.         # this must be a multiple of self.required_shares
 263.         segment_size = mathutil.next_multiple(segment_size,
 264.                                               self.required_shares)
 265.         self.segment_size = segment_size
 266.         if segment_size:
 267.             self.num_segments = mathutil.div_ceil(len(self.newdata),
 268.                                                   segment_size)
 269.         else:
 270.             self.num_segments = 0
 271.         assert self.num_segments in [0, 1,] # SDMF restrictions
 272. 
 273.     def _fatal_error(self, f):
 274.         self.log("error during loop", failure=f, level=log.UNUSUAL)
 275.         self._done(f)
 276. 
 277.     def _update_status(self):
 278.         self._status.set_status("Sending Shares: %d placed out of %d, "
 279.                                 "%d messages outstanding" %
 280.                                 (len(self.placed),
 281.                                  len(self.goal),
 282.                                  len(self.outstanding)))
 283.         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
 284. 
 285.     def loop(self, ignored=None):
 286.         self.log("entering loop", level=log.NOISY)
 287.         if not self._running:
 288.             return
 289. 
 290.         self.looplimit -= 1
 291.         if self.looplimit <= 0:
 292.             raise RuntimeError("loop limit exceeded")
 293. 
 294.         if self.surprised:
 295.             # don't send out any new shares, just wait for the outstanding
 296.             # ones to be retired.
 297.             self.log("currently surprised, so don't send any new shares",
 298.                      level=log.NOISY)
 299.         else:
 300.             self.update_goal()
 301.             # how far are we from our goal?
 302.             needed = self.goal - self.placed - self.outstanding
 303.             self._update_status()
 304. 
 305.             if needed:
 306.                 # we need to send out new shares
 307.                 self.log(format="need to send %(needed)d new shares",
 308.                          needed=len(needed), level=log.NOISY)
 309.                 self._send_shares(needed)
 310.                 return
 311. 
 312.         if self.outstanding:
 313.             # queries are still pending, keep waiting
 314.             self.log(format="%(outstanding)d queries still outstanding",
 315.                      outstanding=len(self.outstanding),
 316.                      level=log.NOISY)
 317.             return
 318. 
 319.         # no queries outstanding, no placements needed: we're done
 320.         self.log("no queries outstanding, no placements needed: done",
 321.                  level=log.OPERATIONAL)
 322.         now = time.time()
 323.         elapsed = now - self._started_pushing
 324.         self._status.timings["push"] = elapsed
 325.         return self._done(None)
 326. 
 327.     def log_goal(self, goal, message=""):
 328.         logmsg = [message]
 329.         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
 330.             logmsg.append("sh%d to [%s]" % (shnum,
 331.                                             idlib.shortnodeid_b2a(peerid)))
 332.         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
 333.         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
 334.                  level=log.NOISY)
 335. 
 336.     def update_goal(self):
 337.         # if log.recording_noisy
 338.         if True:
 339.             self.log_goal(self.goal, "before update: ")
 340. 
 341.         # first, remove any bad peers from our goal
 342.         self.goal = set([ (peerid, shnum)
 343.                           for (peerid, shnum) in self.goal
 344.                           if peerid not in self.bad_peers ])
 345. 
 346.         # find the homeless shares:
 347.         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
 348.         homeless_shares = set(range(self.total_shares)) - homefull_shares
 349.         homeless_shares = sorted(list(homeless_shares))
 350.         # place them somewhere. We prefer unused servers at the beginning of
 351.         # the available peer list.
 352. 
 353.         if not homeless_shares:
 354.             return
 355. 
 356.         # if an old share X is on a node, put the new share X there too.
 357.         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
 358.         #       shares from existing peers to new (less-crowded) ones. The
 359.         #       old shares must still be updated.
 360.         # TODO: 2: move those shares instead of copying them, to reduce future
 361.         #       update work
 362. 
 363.         # this is a bit CPU intensive but easy to analyze. We create a sort
 364.         # order for each peerid. If the peerid is marked as bad, we don't
 365.         # even put them in the list. Then we care about the number of shares
 366.         # which have already been assigned to them. After that we care about
 367.         # their permutation order.
 368.         old_assignments = DictOfSets()
 369.         for (peerid, shnum) in self.goal:
 370.             old_assignments.add(peerid, shnum)
 371. 
 372.         peerlist = []
 373.         for i, (peerid, ss) in enumerate(self.full_peerlist):
 374.             if peerid in self.bad_peers:
 375.                 continue
 376.             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
 377.             peerlist.append(entry)
 378.         peerlist.sort()
 379. 
 380.         if not peerlist:
 381.             raise NotEnoughServersError("Ran out of non-bad servers")
 382. 
 383.         new_assignments = []
 384.         # we then index this peerlist with an integer, because we may have to
 385.         # wrap. We update the goal as we go.
 386.         i = 0
 387.         for shnum in homeless_shares:
 388.             (ignored1, ignored2, peerid, ss) = peerlist[i]
 389.             # TODO: if we are forced to send a share to a server that already
 390.             # has one, we may have two write requests in flight, and the
 391.             # servermap (which was computed before either request was sent)
 392.             # won't reflect the new shares, so the second response will cause
 393.             # us to be surprised ("unexpected share on peer"), causing the
 394.             # publish to fail with an UncoordinatedWriteError. This is
 395.             # troublesome but not really a bit problem. Fix it at some point.
 396.             self.goal.add( (peerid, shnum) )
 397.             self.connections[peerid] = ss
 398.             i += 1
 399.             if i >= len(peerlist):
 400.                 i = 0
 401.         if True:
 402.             self.log_goal(self.goal, "after update: ")
 403. 
 404. 
 405. 
 406.     def _encrypt_and_encode(self):
 407.         # this returns a Deferred that fires with a list of (sharedata,
 408.         # sharenum) tuples. TODO: cache the ciphertext, only produce the
 409.         # shares that we care about.
 410.         self.log("_encrypt_and_encode")
 411. 
 412.         self._status.set_status("Encrypting")
 413.         started = time.time()
 414. 
 415.         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
 416.         enc = AES(key)
 417.         crypttext = enc.process(self.newdata)
 418.         assert len(crypttext) == len(self.newdata)
 419. 
 420.         now = time.time()
 421.         self._status.timings["encrypt"] = now - started
 422.         started = now
 423. 
 424.         # now apply FEC
 425. 
 426.         self._status.set_status("Encoding")
 427.         fec = codec.CRSEncoder()
 428.         fec.set_params(self.segment_size,
 429.                        self.required_shares, self.total_shares)
 430.         piece_size = fec.get_block_size()
 431.         crypttext_pieces = [None] * self.required_shares
 432.         for i in range(len(crypttext_pieces)):
 433.             offset = i * piece_size
 434.             piece = crypttext[offset:offset+piece_size]
 435.             piece = piece + "\x00"*(piece_size - len(piece)) # padding
 436.             crypttext_pieces[i] = piece
 437.             assert len(piece) == piece_size
 438. 
 439.         d = fec.encode(crypttext_pieces)
 440.         def _done_encoding(res):
 441.             elapsed = time.time() - started
 442.             self._status.timings["encode"] = elapsed
 443.             return res
 444.         d.addCallback(_done_encoding)
 445.         return d
 446. 
 447.     def _generate_shares(self, shares_and_shareids):
 448.         # this sets self.shares and self.root_hash
 449.         self.log("_generate_shares")
 450.         self._status.set_status("Generating Shares")
 451.         started = time.time()
 452. 
 453.         # we should know these by now
 454.         privkey = self._privkey
 455.         encprivkey = self._encprivkey
 456.         pubkey = self._pubkey
 457. 
 458.         (shares, share_ids) = shares_and_shareids
 459. 
 460.         assert len(shares) == len(share_ids)
 461.         assert len(shares) == self.total_shares
 462.         all_shares = {}
 463.         block_hash_trees = {}
 464.         share_hash_leaves = [None] * len(shares)
 465.         for i in range(len(shares)):
 466.             share_data = shares[i]
 467.             shnum = share_ids[i]
 468.             all_shares[shnum] = share_data
 469. 
 470.             # build the block hash tree. SDMF has only one leaf.
 471.             leaves = [hashutil.block_hash(share_data)]
 472.             t = hashtree.HashTree(leaves)
 473.             block_hash_trees[shnum] = block_hash_tree = list(t)
 474.             share_hash_leaves[shnum] = t[0]
 475.         for leaf in share_hash_leaves:
 476.             assert leaf is not None
 477.         share_hash_tree = hashtree.HashTree(share_hash_leaves)
 478.         share_hash_chain = {}
 479.         for shnum in range(self.total_shares):
 480.             needed_hashes = share_hash_tree.needed_hashes(shnum)
 481.             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
 482.                                               for i in needed_hashes ] )
 483.         root_hash = share_hash_tree[0]
 484.         assert len(root_hash) == 32
 485.         self.log("my new root_hash is %s" % base32.b2a(root_hash))
 486. 
 487.         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
 488.                              self.required_shares, self.total_shares,
 489.                              self.segment_size, len(self.newdata))
 490. 
 491.         # now pack the beginning of the share. All shares are the same up
 492.         # to the signature, then they have divergent share hash chains,
 493.         # then completely different block hash trees + salt + share data,
 494.         # then they all share the same encprivkey at the end. The sizes
 495.         # of everything are the same for all shares.
 496. 
 497.         sign_started = time.time()
 498.         signature = privkey.sign(prefix)
 499.         self._status.timings["sign"] = time.time() - sign_started
 500. 
 501.         verification_key = pubkey.serialize()
 502. 
 503.         final_shares = {}
 504.         for shnum in range(self.total_shares):
 505.             final_share = pack_share(prefix,
 506.                                      verification_key,
 507.                                      signature,
 508.                                      share_hash_chain[shnum],
 509.                                      block_hash_trees[shnum],
 510.                                      all_shares[shnum],
 511.                                      encprivkey)
 512.             final_shares[shnum] = final_share
 513.         elapsed = time.time() - started
 514.         self._status.timings["pack"] = elapsed
 515.         self.shares = final_shares
 516.         self.root_hash = root_hash
 517. 
 518.         # we also need to build up the version identifier for what we're
 519.         # pushing. Extract the offsets from one of our shares.
 520.         assert final_shares
 521.         offsets = unpack_header(final_shares.values()[0])[-1]
 522.         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
 523.         verinfo = (self._new_seqnum, root_hash, self.salt,
 524.                    self.segment_size, len(self.newdata),
 525.                    self.required_shares, self.total_shares,
 526.                    prefix, offsets_tuple)
 527.         self.versioninfo = verinfo
 528. 
 529. 
 530. 
 531.     def _send_shares(self, needed):
 532.         self.log("_send_shares")
 533. 
 534.         # we're finally ready to send out our shares. If we encounter any
 535.         # surprises here, it's because somebody else is writing at the same
 536.         # time. (Note: in the future, when we remove the _query_peers() step
 537.         # and instead speculate about [or remember] which shares are where,
 538.         # surprises here are *not* indications of UncoordinatedWriteError,
 539.         # and we'll need to respond to them more gracefully.)
 540. 
 541.         # needed is a set of (peerid, shnum) tuples. The first thing we do is
 542.         # organize it by peerid.
 543. 
 544.         peermap = DictOfSets()
 545.         for (peerid, shnum) in needed:
 546.             peermap.add(peerid, shnum)
 547. 
 548.         # the next thing is to build up a bunch of test vectors. The
 549.         # semantics of Publish are that we perform the operation if the world
 550.         # hasn't changed since the ServerMap was constructed (more or less).
 551.         # For every share we're trying to place, we create a test vector that
 552.         # tests to see if the server*share still corresponds to the
 553.         # map.
 554. 
 555.         all_tw_vectors = {} # maps peerid to tw_vectors
 556.         sm = self._servermap.servermap
 557. 
 558.         for key in needed:
 559.             (peerid, shnum) = key
 560. 
 561.             if key in sm:
 562.                 # an old version of that share already exists on the
 563.                 # server, according to our servermap. We will create a
 564.                 # request that attempts to replace it.
 565.                 old_versionid, old_timestamp = sm[key]
 566.                 (old_seqnum, old_root_hash, old_salt, old_segsize,
 567.                  old_datalength, old_k, old_N, old_prefix,
 568.                  old_offsets_tuple) = old_versionid
 569.                 old_checkstring = pack_checkstring(old_seqnum,
 570.                                                    old_root_hash,
 571.                                                    old_salt)
 572.                 testv = (0, len(old_checkstring), "eq", old_checkstring)
 573. 
 574.             elif key in self.bad_share_checkstrings:
 575.                 old_checkstring = self.bad_share_checkstrings[key]
 576.                 testv = (0, len(old_checkstring), "eq", old_checkstring)
 577. 
 578.             else:
 579.                 # add a testv that requires the share not exist
 580.                 testv = (0, 1, 'eq', "")
 581. 
 582.             testvs = [testv]
 583.             # the write vector is simply the share
 584.             writev = [(0, self.shares[shnum])]
 585. 
 586.             if peerid not in all_tw_vectors:
 587.                 all_tw_vectors[peerid] = {}
 588.                 # maps shnum to (testvs, writevs, new_length)
 589.             assert shnum not in all_tw_vectors[peerid]
 590. 
 591.             all_tw_vectors[peerid][shnum] = (testvs, writev, None)
 592. 
 593.         # we read the checkstring back from each share, however we only use
 594.         # it to detect whether there was a new share that we didn't know
 595.         # about. The success or failure of the write will tell us whether
 596.         # there was a collision or not. If there is a collision, the first
 597.         # thing we'll do is update the servermap, which will find out what
 598.         # happened. We could conceivably reduce a roundtrip by using the
 599.         # readv checkstring to populate the servermap, but really we'd have
 600.         # to read enough data to validate the signatures too, so it wouldn't
 601.         # be an overall win.
 602.         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
 603. 
 604.         # ok, send the messages!
 605.         self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
 606.         started = time.time()
 607.         for (peerid, tw_vectors) in all_tw_vectors.items():
 608. 
 609.             write_enabler = self._node.get_write_enabler(peerid)
 610.             renew_secret = self._node.get_renewal_secret(peerid)
 611.             cancel_secret = self._node.get_cancel_secret(peerid)
 612.             secrets = (write_enabler, renew_secret, cancel_secret)
 613.             shnums = tw_vectors.keys()
 614. 
 615.             for shnum in shnums:
 616.                 self.outstanding.add( (peerid, shnum) )
 617. 
 618.             d = self._do_testreadwrite(peerid, secrets,
 619.                                        tw_vectors, read_vector)
 620.             d.addCallbacks(self._got_write_answer, self._got_write_error,
 621.                            callbackArgs=(peerid, shnums, started),
 622.                            errbackArgs=(peerid, shnums, started))
 623.             d.addCallback(self.loop)
 624.             d.addErrback(self._fatal_error)
 625. 
 626.         self._update_status()
 627.         self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
 628. 
 629.     def _do_testreadwrite(self, peerid, secrets,
 630.                           tw_vectors, read_vector):
 631.         storage_index = self._storage_index
 632.         ss = self.connections[peerid]
 633. 
 634.         #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
 635.         d = ss.callRemote("slot_testv_and_readv_and_writev",
 636.                           storage_index,
 637.                           secrets,
 638.                           tw_vectors,
 639.                           read_vector)
 640.         return d
 641. 
 642.     def _got_write_answer(self, answer, peerid, shnums, started):
 643.         lp = self.log("_got_write_answer from %s" %
 644.                       idlib.shortnodeid_b2a(peerid))
 645.         for shnum in shnums:
 646.             self.outstanding.discard( (peerid, shnum) )
 647. 
 648.         now = time.time()
 649.         elapsed = now - started
 650.         self._status.add_per_server_time(peerid, elapsed)
 651. 
 652.         wrote, read_data = answer
 653. 
 654.         surprise_shares = set(read_data.keys()) - set(shnums)
 655.         if surprise_shares:
 656.             self.log("they had shares %s that we didn't know about" %
 657.                      (list(surprise_shares),),
 658.                      parent=lp, level=log.WEIRD, umid="un9CSQ")
 659.             self.surprised = True
 660. 
 661.         if not wrote:
 662.             # TODO: there are two possibilities. The first is that the server
 663.             # is full (or just doesn't want to give us any room), which means
 664.             # we shouldn't ask them again, but is *not* an indication of an
 665.             # uncoordinated write. The second is that our testv failed, which
 666.             # *does* indicate an uncoordinated write. We currently don't have
 667.             # a way to tell these two apart (in fact, the storage server code
 668.             # doesn't have the option of refusing our share).
 669.             #
 670.             # If the server is full, mark the peer as bad (so we don't ask
 671.             # them again), but don't set self.surprised. The loop() will find
 672.             # a new server.
 673.             #
 674.             # If the testv failed, log it, set self.surprised, but don't
 675.             # bother adding to self.bad_peers .
 676. 
 677.             self.log("our testv failed, so the write did not happen",
 678.                      parent=lp, level=log.WEIRD, umid="8sc26g")
 679.             self.surprised = True
 680.             self.bad_peers.add(peerid) # don't ask them again
 681.             # use the checkstring to add information to the log message
 682.             for (shnum,readv) in read_data.items():
 683.                 checkstring = readv[0]
 684.                 (other_seqnum,
 685.                  other_roothash,
 686.                  other_salt) = unpack_checkstring(checkstring)
 687.                 expected_version = self._servermap.version_on_peer(peerid,
 688.                                                                    shnum)
 689.                 if expected_version:
 690.                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
 691.                      offsets_tuple) = expected_version
 692.                     self.log("somebody modified the share on us:"
 693.                              " shnum=%d: I thought they had #%d:R=%s,"
 694.                              " but testv reported #%d:R=%s" %
 695.                              (shnum,
 696.                               seqnum, base32.b2a(root_hash)[:4],
 697.                               other_seqnum, base32.b2a(other_roothash)[:4]),
 698.                              parent=lp, level=log.NOISY)
 699.                 # if expected_version==None, then we didn't expect to see a
 700.                 # share on that peer, and the 'surprise_shares' clause above
 701.                 # will have logged it.
 702.             # self.loop() will take care of finding new homes
 703.             return
 704. 
 705.         for shnum in shnums:
 706.             self.placed.add( (peerid, shnum) )
 707.             # and update the servermap
 708.             self._servermap.add_new_share(peerid, shnum,
 709.                                           self.versioninfo, started)
 710. 
 711.         # self.loop() will take care of checking to see if we're done
 712.         return
 713. 
 714.     def _got_write_error(self, f, peerid, shnums, started):
 715.         for shnum in shnums:
 716.             self.outstanding.discard( (peerid, shnum) )
 717.         self.bad_peers.add(peerid)
 718.         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
 719.                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
 720.                  failure=f,
 721.                  level=log.UNUSUAL)
 722.         # self.loop() will take care of checking to see if we're done
 723.         return
 724. 
 725. 
 726.     def _done(self, res):
 727.         if not self._running:
 728.             return
 729.         self._running = False
 730.         now = time.time()
 731.         self._status.timings["total"] = now - self._started
 732.         self._status.set_active(False)
 733.         if isinstance(res, failure.Failure):
 734.             self.log("Publish done, with failure", failure=res,
 735.                      level=log.WEIRD, umid="nRsR9Q")
 736.             self._status.set_status("Failed")
 737.         elif self.surprised:
 738.             self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
 739.             self._status.set_status("UncoordinatedWriteError")
 740.             # deliver a failure
 741.             res = failure.Failure(UncoordinatedWriteError())
 742.             # TODO: recovery
 743.         else:
 744.             self.log("Publish done, success")
 745.             self._status.set_status("Done")
 746.             self._status.set_progress(1.0)
 747.         eventually(self.done_deferred.callback, res)
 748. 
 749.