source file: /home/buildslave/tahoe/edgy/build/src/allmydata/immutable/checker.py
file stats: 184 lines, 172 executed: 93.5% covered
   1. 
   2. """
   3. Given a StorageIndex, count how many shares we can find.
   4. 
   5. This does no verification of the shares whatsoever. If the peer claims to
   6. have the share, we believe them.
   7. """
   8. 
   9. from zope.interface import implements
  10. from twisted.internet import defer
  11. from twisted.python import log
  12. from allmydata import storage
  13. from allmydata.interfaces import ICheckerResults, IDeepCheckResults
  14. from allmydata.immutable import download
  15. from allmydata.util import hashutil, base32
  16. 
  17. class Results:
  18.     implements(ICheckerResults)
  19. 
  20.     def __init__(self, storage_index):
  21.         # storage_index might be None for, say, LIT files
  22.         self.storage_index = storage_index
  23.         if storage_index is None:
  24.             self.storage_index_s = "<none>"
  25.         else:
  26.             self.storage_index_s = base32.b2a(storage_index)[:6]
  27.         self.status_report = "[not generated yet]" # string
  28. 
  29.     def is_healthy(self):
  30.         return self.healthy
  31. 
  32.     def get_storage_index(self):
  33.         return self.storage_index
  34. 
  35.     def get_storage_index_string(self):
  36.         return self.storage_index_s
  37. 
  38.     def get_mutability_string(self):
  39.         if self.storage_index:
  40.             return "immutable"
  41.         return "literal"
  42. 
  43.     def to_string(self):
  44.         s = ""
  45.         if self.healthy:
  46.             s += "Healthy!\n"
  47.         else:
  48.             s += "Not Healthy!\n"
  49.         s += "\n"
  50.         s += self.status_report
  51.         s += "\n"
  52.         return s
  53. 
  54. class DeepCheckResults:
  55.     implements(IDeepCheckResults)
  56. 
  57.     def __init__(self, root_storage_index):
  58.         self.root_storage_index = root_storage_index
  59.         if root_storage_index is None:
  60.             self.root_storage_index_s = "<none>"
  61.         else:
  62.             self.root_storage_index_s = base32.b2a(root_storage_index)[:6]
  63. 
  64.         self.objects_checked = 0
  65.         self.objects_healthy = 0
  66.         self.repairs_attempted = 0
  67.         self.repairs_successful = 0
  68.         self.problems = []
  69.         self.all_results = {}
  70.         self.server_problems = {}
  71. 
  72.     def get_root_storage_index_string(self):
  73.         return self.root_storage_index_s
  74. 
  75.     def add_check(self, r):
  76.         self.objects_checked += 1
  77.         if r.is_healthy():
  78.             self.objects_healthy += 1
  79.         else:
  80.             self.problems.append(r)
  81.         self.all_results[r.get_storage_index()] = r
  82. 
  83.     def add_repair(self, is_successful):
  84.         self.repairs_attempted += 1
  85.         if is_successful:
  86.             self.repairs_successful += 1
  87. 
  88.     def count_objects_checked(self):
  89.         return self.objects_checked
  90.     def count_objects_healthy(self):
  91.         return self.objects_healthy
  92.     def count_repairs_attempted(self):
  93.         return self.repairs_attempted
  94.     def count_repairs_successful(self):
  95.         return self.repairs_successful
  96.     def get_server_problems(self):
  97.         return self.server_problems
  98.     def get_problems(self):
  99.         return self.problems
 100.     def get_all_results(self):
 101.         return self.all_results
 102. 
 103. class SimpleCHKFileChecker:
 104.     """Return a list of (needed, total, found, sharemap), where sharemap maps
 105.     share number to a list of (binary) nodeids of the shareholders."""
 106. 
 107.     def __init__(self, client, storage_index, needed_shares, total_shares):
 108.         self.peer_getter = client.get_permuted_peers
 109.         self.needed_shares = needed_shares
 110.         self.total_shares = total_shares
 111.         self.found_shares = set()
 112.         self.storage_index = storage_index
 113.         self.sharemap = {}
 114. 
 115.     '''
 116.     def check_synchronously(self, si):
 117.         # this is how we would write this class if we were using synchronous
 118.         # messages (or if we used promises).
 119.         found = set()
 120.         for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
 121.             buckets = connection.get_buckets(si)
 122.             found.update(buckets.keys())
 123.         return len(found)
 124.     '''
 125. 
 126.     def start(self):
 127.         d = self._get_all_shareholders(self.storage_index)
 128.         d.addCallback(self._done)
 129.         return d
 130. 
 131.     def _get_all_shareholders(self, storage_index):
 132.         dl = []
 133.         for (peerid, ss) in self.peer_getter("storage", storage_index):
 134.             d = ss.callRemote("get_buckets", storage_index)
 135.             d.addCallbacks(self._got_response, self._got_error,
 136.                            callbackArgs=(peerid,))
 137.             dl.append(d)
 138.         return defer.DeferredList(dl)
 139. 
 140.     def _got_response(self, buckets, peerid):
 141.         # buckets is a dict: maps shum to an rref of the server who holds it
 142.         self.found_shares.update(buckets.keys())
 143.         for k in buckets:
 144.             if k not in self.sharemap:
 145.                 self.sharemap[k] = []
 146.             self.sharemap[k].append(peerid)
 147. 
 148.     def _got_error(self, f):
 149.         if f.check(KeyError):
 150.             pass
 151.         log.err(f)
 152.         pass
 153. 
 154.     def _done(self, res):
 155.         r = Results(self.storage_index)
 156.         report = []
 157.         r.healthy = bool(len(self.found_shares) >= self.total_shares)
 158.         r.stuff = (self.needed_shares, self.total_shares,
 159.                    len(self.found_shares), self.sharemap)
 160.         if len(self.found_shares) < self.total_shares:
 161.             wanted = set(range(self.total_shares))
 162.             missing = wanted - self.found_shares
 163.             report.append("Missing shares: %s" %
 164.                           ",".join(["sh%d" % shnum
 165.                                     for shnum in sorted(missing)]))
 166.         r.status_report = "\n".join(report) + "\n"
 167.         return r
 168. 
 169. class VerifyingOutput:
 170.     def __init__(self, total_length, results):
 171.         self._crypttext_hasher = hashutil.crypttext_hasher()
 172.         self.length = 0
 173.         self.total_length = total_length
 174.         self._segment_number = 0
 175.         self._crypttext_hash_tree = None
 176.         self._opened = False
 177.         self._results = results
 178.         results.healthy = False
 179. 
 180.     def setup_hashtrees(self, plaintext_hashtree, crypttext_hashtree):
 181.         self._crypttext_hash_tree = crypttext_hashtree
 182. 
 183.     def write_segment(self, crypttext):
 184.         self.length += len(crypttext)
 185. 
 186.         self._crypttext_hasher.update(crypttext)
 187.         if self._crypttext_hash_tree:
 188.             ch = hashutil.crypttext_segment_hasher()
 189.             ch.update(crypttext)
 190.             crypttext_leaves = {self._segment_number: ch.digest()}
 191.             self._crypttext_hash_tree.set_hashes(leaves=crypttext_leaves)
 192. 
 193.         self._segment_number += 1
 194. 
 195.     def close(self):
 196.         self.crypttext_hash = self._crypttext_hasher.digest()
 197. 
 198.     def finish(self):
 199.         self._results.healthy = True
 200.         return self._results
 201. 
 202. 
 203. class SimpleCHKFileVerifier(download.FileDownloader):
 204.     # this reconstructs the crypttext, which verifies that at least 'k' of
 205.     # the shareholders are around and have valid data. It does not check the
 206.     # remaining shareholders, and it cannot verify the plaintext.
 207.     check_plaintext_hash = False
 208. 
 209.     def __init__(self, client, storage_index, k, N, size, ueb_hash):
 210.         self._client = client
 211. 
 212.         self._storage_index = storage_index
 213.         self._uri_extension_hash = ueb_hash
 214.         self._total_shares = N
 215.         self._size = size
 216.         self._num_needed_shares = k
 217. 
 218.         self._si_s = storage.si_b2a(self._storage_index)
 219.         self.init_logging()
 220. 
 221.         r = Results(self._storage_index)
 222.         self._output = VerifyingOutput(self._size, r)
 223.         self._paused = False
 224.         self._stopped = False
 225. 
 226.         self._results = None
 227.         self.active_buckets = {} # k: shnum, v: bucket
 228.         self._share_buckets = [] # list of (sharenum, bucket) tuples
 229.         self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
 230.         self._uri_extension_sources = []
 231. 
 232.         self._uri_extension_data = None
 233. 
 234.         self._fetch_failures = {"uri_extension": 0,
 235.                                 "plaintext_hashroot": 0,
 236.                                 "plaintext_hashtree": 0,
 237.                                 "crypttext_hashroot": 0,
 238.                                 "crypttext_hashtree": 0,
 239.                                 }
 240. 
 241.     def init_logging(self):
 242.         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
 243.         num = self._client.log("SimpleCHKFileVerifier(%s): starting" % prefix)
 244.         self._log_number = num
 245. 
 246.     def log(self, msg, *args, **kwargs):
 247.         if not kwargs.get('parent'):
 248.             kwargs['parent'] = self._log_number
 249.         return self._client.log("SimpleCHKFileVerifier(%s): %s"
 250.                                 % (self._log_prefix, msg),
 251.                                 *args, **kwargs)
 252. 
 253. 
 254.     def start(self):
 255.         log.msg("starting download [%s]" % storage.si_b2a(self._storage_index)[:5])
 256. 
 257.         # first step: who should we download from?
 258.         d = defer.maybeDeferred(self._get_all_shareholders)
 259.         d.addCallback(self._got_all_shareholders)
 260.         # now get the uri_extension block from somebody and validate it
 261.         d.addCallback(self._obtain_uri_extension)
 262.         d.addCallback(self._got_uri_extension)
 263.         d.addCallback(self._get_hashtrees)
 264.         d.addCallback(self._create_validated_buckets)
 265.         # once we know that, we can download blocks from everybody
 266.         d.addCallback(self._download_all_segments)
 267.         d.addCallback(self._done)
 268.         return d
 269.