source file: /home/buildslave/tahoe/edgy/build/src/allmydata/mutable/checker.py
file stats: 193 lines, 186 executed: 96.4% covered
1.
2. from zope.interface import implements
3. from twisted.internet import defer
4. from twisted.python import failure
5. from allmydata import hashtree
6. from allmydata.util import hashutil, base32, idlib, log
7. from allmydata.interfaces import ICheckerResults
8.
9. from common import MODE_CHECK, CorruptShareError
10. from servermap import ServerMap, ServermapUpdater
11. from layout import unpack_share, SIGNED_PREFIX_LENGTH
12.
13. class MutableChecker:
14.
15. def __init__(self, node):
16. self._node = node
17. self.bad_shares = [] # list of (nodeid,shnum,failure)
18. self._storage_index = self._node.get_storage_index()
19. self.results = Results(self._storage_index)
20. self.need_repair = False
21.
22. def check(self, verify=False, repair=False):
23. servermap = ServerMap()
24. self.results.servermap = servermap
25. u = ServermapUpdater(self._node, servermap, MODE_CHECK)
26. d = u.update()
27. d.addCallback(self._got_mapupdate_results)
28. if verify:
29. d.addCallback(self._verify_all_shares)
30. d.addCallback(self._generate_results)
31. if repair:
32. d.addCallback(self._maybe_do_repair)
33. d.addCallback(self._return_results)
34. return d
35.
36. def _got_mapupdate_results(self, servermap):
37. # the file is healthy if there is exactly one recoverable version, it
38. # has at least N distinct shares, and there are no unrecoverable
39. # versions: all existing shares will be for the same version.
40. self.best_version = None
41. num_recoverable = len(servermap.recoverable_versions())
42. if num_recoverable:
43. self.best_version = servermap.best_recoverable_version()
44.
45. if servermap.unrecoverable_versions():
46. self.need_repair = True
47. if num_recoverable != 1:
48. self.need_repair = True
49. if self.best_version:
50. available_shares = servermap.shares_available()
51. (num_distinct_shares, k, N) = available_shares[self.best_version]
52. if num_distinct_shares < N:
53. self.need_repair = True
54.
55. return servermap
56.
57. def _verify_all_shares(self, servermap):
58. # read every byte of each share
59. if not self.best_version:
60. return
61. versionmap = servermap.make_versionmap()
62. shares = versionmap[self.best_version]
63. (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
64. offsets_tuple) = self.best_version
65. offsets = dict(offsets_tuple)
66. readv = [ (0, offsets["EOF"]) ]
67. dl = []
68. for (shnum, peerid, timestamp) in shares:
69. ss = servermap.connections[peerid]
70. d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
71. d.addCallback(self._got_answer, peerid)
72. dl.append(d)
73. return defer.DeferredList(dl, fireOnOneErrback=True)
74.
75. def _do_read(self, ss, peerid, storage_index, shnums, readv):
76. # isolate the callRemote to a separate method, so tests can subclass
77. # Publish and override it
78. d = ss.callRemote("slot_readv", storage_index, shnums, readv)
79. return d
80.
81. def _got_answer(self, datavs, peerid):
82. for shnum,datav in datavs.items():
83. data = datav[0]
84. try:
85. self._got_results_one_share(shnum, peerid, data)
86. except CorruptShareError:
87. f = failure.Failure()
88. self.need_repair = True
89. self.bad_shares.append( (peerid, shnum, f) )
90. prefix = data[:SIGNED_PREFIX_LENGTH]
91. self.results.servermap.mark_bad_share(peerid, shnum, prefix)
92.
93. def check_prefix(self, peerid, shnum, data):
94. (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
95. offsets_tuple) = self.best_version
96. got_prefix = data[:SIGNED_PREFIX_LENGTH]
97. if got_prefix != prefix:
98. raise CorruptShareError(peerid, shnum,
99. "prefix mismatch: share changed while we were reading it")
100.
101. def _got_results_one_share(self, shnum, peerid, data):
102. self.check_prefix(peerid, shnum, data)
103.
104. # the [seqnum:signature] pieces are validated by _compare_prefix,
105. # which checks their signature against the pubkey known to be
106. # associated with this file.
107.
108. (seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature,
109. share_hash_chain, block_hash_tree, share_data,
110. enc_privkey) = unpack_share(data)
111.
112. # validate [share_hash_chain,block_hash_tree,share_data]
113.
114. leaves = [hashutil.block_hash(share_data)]
115. t = hashtree.HashTree(leaves)
116. if list(t) != block_hash_tree:
117. raise CorruptShareError(peerid, shnum, "block hash tree failure")
118. share_hash_leaf = t[0]
119. t2 = hashtree.IncompleteHashTree(N)
120. # root_hash was checked by the signature
121. t2.set_hashes({0: root_hash})
122. try:
123. t2.set_hashes(hashes=share_hash_chain,
124. leaves={shnum: share_hash_leaf})
125. except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
126. IndexError), e:
127. msg = "corrupt hashes: %s" % (e,)
128. raise CorruptShareError(peerid, shnum, msg)
129.
130. # validate enc_privkey: only possible if we have a write-cap
131. if not self._node.is_readonly():
132. alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
133. alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
134. if alleged_writekey != self._node.get_writekey():
135. raise CorruptShareError(peerid, shnum, "invalid privkey")
136.
137. def _generate_results(self, res):
138. self.results.healthy = True
139. smap = self.results.servermap
140. report = []
141. vmap = smap.make_versionmap()
142. recoverable = smap.recoverable_versions()
143. unrecoverable = smap.unrecoverable_versions()
144. if recoverable:
145. report.append("Recoverable Versions: " +
146. "/".join(["%d*%s" % (len(vmap[v]),
147. smap.summarize_version(v))
148. for v in recoverable]))
149. if unrecoverable:
150. report.append("Unrecoverable Versions: " +
151. "/".join(["%d*%s" % (len(vmap[v]),
152. smap.summarize_version(v))
153. for v in unrecoverable]))
154. if smap.unrecoverable_versions():
155. self.results.healthy = False
156. report.append("Unhealthy: some versions are unrecoverable")
157. if len(recoverable) == 0:
158. self.results.healthy = False
159. report.append("Unhealthy: no versions are recoverable")
160. if len(recoverable) > 1:
161. self.results.healthy = False
162. report.append("Unhealthy: there are multiple recoverable versions")
163. if self.best_version:
164. report.append("Best Recoverable Version: " +
165. smap.summarize_version(self.best_version))
166. available_shares = smap.shares_available()
167. (num_distinct_shares, k, N) = available_shares[self.best_version]
168. if num_distinct_shares < N:
169. self.results.healthy = False
170. report.append("Unhealthy: best recoverable version has only %d shares (encoding is %d-of-%d)"
171. % (num_distinct_shares, k, N))
172. if self.bad_shares:
173. report.append("Corrupt Shares:")
174. for (peerid, shnum, f) in sorted(self.bad_shares):
175. s = "%s-sh%d" % (idlib.shortnodeid_b2a(peerid), shnum)
176. if f.check(CorruptShareError):
177. ft = f.value.reason
178. else:
179. ft = str(f)
180. report.append(" %s: %s" % (s, ft))
181. p = (peerid, self._storage_index, shnum, f)
182. self.results.problems.append(p)
183. msg = ("CorruptShareError during mutable verify, "
184. "peerid=%(peerid)s, si=%(si)s, shnum=%(shnum)d, "
185. "where=%(where)s")
186. log.msg(format=msg, peerid=idlib.nodeid_b2a(peerid),
187. si=base32.b2a(self._storage_index),
188. shnum=shnum,
189. where=ft,
190. level=log.WEIRD, umid="EkK8QA")
191.
192. self.results.status_report = "\n".join(report) + "\n"
193.
194. def _maybe_do_repair(self, res):
195. if not self.need_repair:
196. return
197. self.results.repair_attempted = True
198. d = self._node.repair(self.results)
199. def _repair_finished(repair_results):
200. self.results.repair_succeeded = True
201. self.results.repair_results = repair_results
202. def _repair_error(f):
203. # I'm not sure if I want to pass through a failure or not.
204. self.results.repair_succeeded = False
205. self.results.repair_failure = f
206. return f
207. d.addCallbacks(_repair_finished, _repair_error)
208. return d
209.
210. def _return_results(self, res):
211. return self.results
212.
213.
214. class Results:
215. implements(ICheckerResults)
216.
217. def __init__(self, storage_index):
218. self.storage_index = storage_index
219. self.storage_index_s = base32.b2a(storage_index)[:6]
220. self.repair_attempted = False
221. self.status_report = "[not generated yet]" # string
222. self.repair_report = None
223. self.problems = [] # list of (peerid, storage_index, shnum, failure)
224.
225. def is_healthy(self):
226. return self.healthy
227.
228. def get_storage_index(self):
229. return self.storage_index
230. def get_storage_index_string(self):
231. return self.storage_index_s
232.
233. def get_mutability_string(self):
234. return "mutable"
235.
236. def to_string(self):
237. s = ""
238. if self.healthy:
239. s += "Healthy!\n"
240. else:
241. s += "Not Healthy!\n"
242. s += "\n"
243. s += self.status_report
244. s += "\n"
245. if self.repair_attempted:
246. s += "Repair attempted "
247. if self.repair_succeeded:
248. s += "and successful\n"
249. else:
250. s += "and failed\n"
251. s += "\n"
252. s += self.repair_results.to_string()
253. s += "\n"
254. return s
255.