source file: /home/buildslave/tahoe/edgy/build/src/allmydata/mutable/checker.py
file stats: 193 lines, 186 executed: 96.4% covered
   1. 
   2. from zope.interface import implements
   3. from twisted.internet import defer
   4. from twisted.python import failure
   5. from allmydata import hashtree
   6. from allmydata.util import hashutil, base32, idlib, log
   7. from allmydata.interfaces import ICheckerResults
   8. 
   9. from common import MODE_CHECK, CorruptShareError
  10. from servermap import ServerMap, ServermapUpdater
  11. from layout import unpack_share, SIGNED_PREFIX_LENGTH
  12. 
  13. class MutableChecker:
  14. 
  15.     def __init__(self, node):
  16.         self._node = node
  17.         self.bad_shares = [] # list of (nodeid,shnum,failure)
  18.         self._storage_index = self._node.get_storage_index()
  19.         self.results = Results(self._storage_index)
  20.         self.need_repair = False
  21. 
  22.     def check(self, verify=False, repair=False):
  23.         servermap = ServerMap()
  24.         self.results.servermap = servermap
  25.         u = ServermapUpdater(self._node, servermap, MODE_CHECK)
  26.         d = u.update()
  27.         d.addCallback(self._got_mapupdate_results)
  28.         if verify:
  29.             d.addCallback(self._verify_all_shares)
  30.         d.addCallback(self._generate_results)
  31.         if repair:
  32.             d.addCallback(self._maybe_do_repair)
  33.         d.addCallback(self._return_results)
  34.         return d
  35. 
  36.     def _got_mapupdate_results(self, servermap):
  37.         # the file is healthy if there is exactly one recoverable version, it
  38.         # has at least N distinct shares, and there are no unrecoverable
  39.         # versions: all existing shares will be for the same version.
  40.         self.best_version = None
  41.         num_recoverable = len(servermap.recoverable_versions())
  42.         if num_recoverable:
  43.             self.best_version = servermap.best_recoverable_version()
  44. 
  45.         if servermap.unrecoverable_versions():
  46.             self.need_repair = True
  47.         if num_recoverable != 1:
  48.             self.need_repair = True
  49.         if self.best_version:
  50.             available_shares = servermap.shares_available()
  51.             (num_distinct_shares, k, N) = available_shares[self.best_version]
  52.             if num_distinct_shares < N:
  53.                 self.need_repair = True
  54. 
  55.         return servermap
  56. 
  57.     def _verify_all_shares(self, servermap):
  58.         # read every byte of each share
  59.         if not self.best_version:
  60.             return
  61.         versionmap = servermap.make_versionmap()
  62.         shares = versionmap[self.best_version]
  63.         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
  64.          offsets_tuple) = self.best_version
  65.         offsets = dict(offsets_tuple)
  66.         readv = [ (0, offsets["EOF"]) ]
  67.         dl = []
  68.         for (shnum, peerid, timestamp) in shares:
  69.             ss = servermap.connections[peerid]
  70.             d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
  71.             d.addCallback(self._got_answer, peerid)
  72.             dl.append(d)
  73.         return defer.DeferredList(dl, fireOnOneErrback=True)
  74. 
  75.     def _do_read(self, ss, peerid, storage_index, shnums, readv):
  76.         # isolate the callRemote to a separate method, so tests can subclass
  77.         # Publish and override it
  78.         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
  79.         return d
  80. 
  81.     def _got_answer(self, datavs, peerid):
  82.         for shnum,datav in datavs.items():
  83.             data = datav[0]
  84.             try:
  85.                 self._got_results_one_share(shnum, peerid, data)
  86.             except CorruptShareError:
  87.                 f = failure.Failure()
  88.                 self.need_repair = True
  89.                 self.bad_shares.append( (peerid, shnum, f) )
  90.                 prefix = data[:SIGNED_PREFIX_LENGTH]
  91.                 self.results.servermap.mark_bad_share(peerid, shnum, prefix)
  92. 
  93.     def check_prefix(self, peerid, shnum, data):
  94.         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
  95.          offsets_tuple) = self.best_version
  96.         got_prefix = data[:SIGNED_PREFIX_LENGTH]
  97.         if got_prefix != prefix:
  98.             raise CorruptShareError(peerid, shnum,
  99.                                     "prefix mismatch: share changed while we were reading it")
 100. 
 101.     def _got_results_one_share(self, shnum, peerid, data):
 102.         self.check_prefix(peerid, shnum, data)
 103. 
 104.         # the [seqnum:signature] pieces are validated by _compare_prefix,
 105.         # which checks their signature against the pubkey known to be
 106.         # associated with this file.
 107. 
 108.         (seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature,
 109.          share_hash_chain, block_hash_tree, share_data,
 110.          enc_privkey) = unpack_share(data)
 111. 
 112.         # validate [share_hash_chain,block_hash_tree,share_data]
 113. 
 114.         leaves = [hashutil.block_hash(share_data)]
 115.         t = hashtree.HashTree(leaves)
 116.         if list(t) != block_hash_tree:
 117.             raise CorruptShareError(peerid, shnum, "block hash tree failure")
 118.         share_hash_leaf = t[0]
 119.         t2 = hashtree.IncompleteHashTree(N)
 120.         # root_hash was checked by the signature
 121.         t2.set_hashes({0: root_hash})
 122.         try:
 123.             t2.set_hashes(hashes=share_hash_chain,
 124.                           leaves={shnum: share_hash_leaf})
 125.         except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
 126.                 IndexError), e:
 127.             msg = "corrupt hashes: %s" % (e,)
 128.             raise CorruptShareError(peerid, shnum, msg)
 129. 
 130.         # validate enc_privkey: only possible if we have a write-cap
 131.         if not self._node.is_readonly():
 132.             alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
 133.             alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
 134.             if alleged_writekey != self._node.get_writekey():
 135.                 raise CorruptShareError(peerid, shnum, "invalid privkey")
 136. 
 137.     def _generate_results(self, res):
 138.         self.results.healthy = True
 139.         smap = self.results.servermap
 140.         report = []
 141.         vmap = smap.make_versionmap()
 142.         recoverable = smap.recoverable_versions()
 143.         unrecoverable = smap.unrecoverable_versions()
 144.         if recoverable:
 145.             report.append("Recoverable Versions: " +
 146.                           "/".join(["%d*%s" % (len(vmap[v]),
 147.                                                smap.summarize_version(v))
 148.                                     for v in recoverable]))
 149.         if unrecoverable:
 150.             report.append("Unrecoverable Versions: " +
 151.                           "/".join(["%d*%s" % (len(vmap[v]),
 152.                                                smap.summarize_version(v))
 153.                                     for v in unrecoverable]))
 154.         if smap.unrecoverable_versions():
 155.             self.results.healthy = False
 156.             report.append("Unhealthy: some versions are unrecoverable")
 157.         if len(recoverable) == 0:
 158.             self.results.healthy = False
 159.             report.append("Unhealthy: no versions are recoverable")
 160.         if len(recoverable) > 1:
 161.             self.results.healthy = False
 162.             report.append("Unhealthy: there are multiple recoverable versions")
 163.         if self.best_version:
 164.             report.append("Best Recoverable Version: " +
 165.                           smap.summarize_version(self.best_version))
 166.             available_shares = smap.shares_available()
 167.             (num_distinct_shares, k, N) = available_shares[self.best_version]
 168.             if num_distinct_shares < N:
 169.                 self.results.healthy = False
 170.                 report.append("Unhealthy: best recoverable version has only %d shares (encoding is %d-of-%d)"
 171.                               % (num_distinct_shares, k, N))
 172.         if self.bad_shares:
 173.             report.append("Corrupt Shares:")
 174.             for (peerid, shnum, f) in sorted(self.bad_shares):
 175.                 s = "%s-sh%d" % (idlib.shortnodeid_b2a(peerid), shnum)
 176.                 if f.check(CorruptShareError):
 177.                     ft = f.value.reason
 178.                 else:
 179.                     ft = str(f)
 180.                 report.append(" %s: %s" % (s, ft))
 181.                 p = (peerid, self._storage_index, shnum, f)
 182.                 self.results.problems.append(p)
 183.                 msg = ("CorruptShareError during mutable verify, "
 184.                        "peerid=%(peerid)s, si=%(si)s, shnum=%(shnum)d, "
 185.                        "where=%(where)s")
 186.                 log.msg(format=msg, peerid=idlib.nodeid_b2a(peerid),
 187.                         si=base32.b2a(self._storage_index),
 188.                         shnum=shnum,
 189.                         where=ft,
 190.                         level=log.WEIRD, umid="EkK8QA")
 191. 
 192.         self.results.status_report = "\n".join(report) + "\n"
 193. 
 194.     def _maybe_do_repair(self, res):
 195.         if not self.need_repair:
 196.             return
 197.         self.results.repair_attempted = True
 198.         d = self._node.repair(self.results)
 199.         def _repair_finished(repair_results):
 200.             self.results.repair_succeeded = True
 201.             self.results.repair_results = repair_results
 202.         def _repair_error(f):
 203.             # I'm not sure if I want to pass through a failure or not.
 204.             self.results.repair_succeeded = False
 205.             self.results.repair_failure = f
 206.             return f
 207.         d.addCallbacks(_repair_finished, _repair_error)
 208.         return d
 209. 
 210.     def _return_results(self, res):
 211.         return self.results
 212. 
 213. 
 214. class Results:
 215.     implements(ICheckerResults)
 216. 
 217.     def __init__(self, storage_index):
 218.         self.storage_index = storage_index
 219.         self.storage_index_s = base32.b2a(storage_index)[:6]
 220.         self.repair_attempted = False
 221.         self.status_report = "[not generated yet]" # string
 222.         self.repair_report = None
 223.         self.problems = [] # list of (peerid, storage_index, shnum, failure)
 224. 
 225.     def is_healthy(self):
 226.         return self.healthy
 227. 
 228.     def get_storage_index(self):
 229.         return self.storage_index
 230.     def get_storage_index_string(self):
 231.         return self.storage_index_s
 232. 
 233.     def get_mutability_string(self):
 234.         return "mutable"
 235. 
 236.     def to_string(self):
 237.         s = ""
 238.         if self.healthy:
 239.             s += "Healthy!\n"
 240.         else:
 241.             s += "Not Healthy!\n"
 242.         s += "\n"
 243.         s += self.status_report
 244.         s += "\n"
 245.         if self.repair_attempted:
 246.             s += "Repair attempted "
 247.             if self.repair_succeeded:
 248.                 s += "and successful\n"
 249.             else:
 250.                 s += "and failed\n"
 251.             s += "\n"
 252.             s += self.repair_results.to_string()
 253.             s += "\n"
 254.         return s
 255.