source file: /home/buildslave/tahoe/edgy/build/src/allmydata/immutable/checker.py
file stats: 184 lines, 172 executed: 93.5% covered
1.
2. """
3. Given a StorageIndex, count how many shares we can find.
4.
5. This does no verification of the shares whatsoever. If the peer claims to
6. have the share, we believe them.
7. """
8.
9. from zope.interface import implements
10. from twisted.internet import defer
11. from twisted.python import log
12. from allmydata import storage
13. from allmydata.interfaces import ICheckerResults, IDeepCheckResults
14. from allmydata.immutable import download
15. from allmydata.util import hashutil, base32
16.
17. class Results:
18. implements(ICheckerResults)
19.
20. def __init__(self, storage_index):
21. # storage_index might be None for, say, LIT files
22. self.storage_index = storage_index
23. if storage_index is None:
24. self.storage_index_s = "<none>"
25. else:
26. self.storage_index_s = base32.b2a(storage_index)[:6]
27. self.status_report = "[not generated yet]" # string
28.
29. def is_healthy(self):
30. return self.healthy
31.
32. def get_storage_index(self):
33. return self.storage_index
34.
35. def get_storage_index_string(self):
36. return self.storage_index_s
37.
38. def get_mutability_string(self):
39. if self.storage_index:
40. return "immutable"
41. return "literal"
42.
43. def to_string(self):
44. s = ""
45. if self.healthy:
46. s += "Healthy!\n"
47. else:
48. s += "Not Healthy!\n"
49. s += "\n"
50. s += self.status_report
51. s += "\n"
52. return s
53.
54. class DeepCheckResults:
55. implements(IDeepCheckResults)
56.
57. def __init__(self, root_storage_index):
58. self.root_storage_index = root_storage_index
59. if root_storage_index is None:
60. self.root_storage_index_s = "<none>"
61. else:
62. self.root_storage_index_s = base32.b2a(root_storage_index)[:6]
63.
64. self.objects_checked = 0
65. self.objects_healthy = 0
66. self.repairs_attempted = 0
67. self.repairs_successful = 0
68. self.problems = []
69. self.all_results = {}
70. self.server_problems = {}
71.
72. def get_root_storage_index_string(self):
73. return self.root_storage_index_s
74.
75. def add_check(self, r):
76. self.objects_checked += 1
77. if r.is_healthy():
78. self.objects_healthy += 1
79. else:
80. self.problems.append(r)
81. self.all_results[r.get_storage_index()] = r
82.
83. def add_repair(self, is_successful):
84. self.repairs_attempted += 1
85. if is_successful:
86. self.repairs_successful += 1
87.
88. def count_objects_checked(self):
89. return self.objects_checked
90. def count_objects_healthy(self):
91. return self.objects_healthy
92. def count_repairs_attempted(self):
93. return self.repairs_attempted
94. def count_repairs_successful(self):
95. return self.repairs_successful
96. def get_server_problems(self):
97. return self.server_problems
98. def get_problems(self):
99. return self.problems
100. def get_all_results(self):
101. return self.all_results
102.
103. class SimpleCHKFileChecker:
104. """Return a list of (needed, total, found, sharemap), where sharemap maps
105. share number to a list of (binary) nodeids of the shareholders."""
106.
107. def __init__(self, client, storage_index, needed_shares, total_shares):
108. self.peer_getter = client.get_permuted_peers
109. self.needed_shares = needed_shares
110. self.total_shares = total_shares
111. self.found_shares = set()
112. self.storage_index = storage_index
113. self.sharemap = {}
114.
115. '''
116. def check_synchronously(self, si):
117. # this is how we would write this class if we were using synchronous
118. # messages (or if we used promises).
119. found = set()
120. for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
121. buckets = connection.get_buckets(si)
122. found.update(buckets.keys())
123. return len(found)
124. '''
125.
126. def start(self):
127. d = self._get_all_shareholders(self.storage_index)
128. d.addCallback(self._done)
129. return d
130.
131. def _get_all_shareholders(self, storage_index):
132. dl = []
133. for (peerid, ss) in self.peer_getter("storage", storage_index):
134. d = ss.callRemote("get_buckets", storage_index)
135. d.addCallbacks(self._got_response, self._got_error,
136. callbackArgs=(peerid,))
137. dl.append(d)
138. return defer.DeferredList(dl)
139.
140. def _got_response(self, buckets, peerid):
141. # buckets is a dict: maps shum to an rref of the server who holds it
142. self.found_shares.update(buckets.keys())
143. for k in buckets:
144. if k not in self.sharemap:
145. self.sharemap[k] = []
146. self.sharemap[k].append(peerid)
147.
148. def _got_error(self, f):
149. if f.check(KeyError):
150. pass
151. log.err(f)
152. pass
153.
154. def _done(self, res):
155. r = Results(self.storage_index)
156. report = []
157. r.healthy = bool(len(self.found_shares) >= self.total_shares)
158. r.stuff = (self.needed_shares, self.total_shares,
159. len(self.found_shares), self.sharemap)
160. if len(self.found_shares) < self.total_shares:
161. wanted = set(range(self.total_shares))
162. missing = wanted - self.found_shares
163. report.append("Missing shares: %s" %
164. ",".join(["sh%d" % shnum
165. for shnum in sorted(missing)]))
166. r.status_report = "\n".join(report) + "\n"
167. return r
168.
169. class VerifyingOutput:
170. def __init__(self, total_length, results):
171. self._crypttext_hasher = hashutil.crypttext_hasher()
172. self.length = 0
173. self.total_length = total_length
174. self._segment_number = 0
175. self._crypttext_hash_tree = None
176. self._opened = False
177. self._results = results
178. results.healthy = False
179.
180. def setup_hashtrees(self, plaintext_hashtree, crypttext_hashtree):
181. self._crypttext_hash_tree = crypttext_hashtree
182.
183. def write_segment(self, crypttext):
184. self.length += len(crypttext)
185.
186. self._crypttext_hasher.update(crypttext)
187. if self._crypttext_hash_tree:
188. ch = hashutil.crypttext_segment_hasher()
189. ch.update(crypttext)
190. crypttext_leaves = {self._segment_number: ch.digest()}
191. self._crypttext_hash_tree.set_hashes(leaves=crypttext_leaves)
192.
193. self._segment_number += 1
194.
195. def close(self):
196. self.crypttext_hash = self._crypttext_hasher.digest()
197.
198. def finish(self):
199. self._results.healthy = True
200. return self._results
201.
202.
203. class SimpleCHKFileVerifier(download.FileDownloader):
204. # this reconstructs the crypttext, which verifies that at least 'k' of
205. # the shareholders are around and have valid data. It does not check the
206. # remaining shareholders, and it cannot verify the plaintext.
207. check_plaintext_hash = False
208.
209. def __init__(self, client, storage_index, k, N, size, ueb_hash):
210. self._client = client
211.
212. self._storage_index = storage_index
213. self._uri_extension_hash = ueb_hash
214. self._total_shares = N
215. self._size = size
216. self._num_needed_shares = k
217.
218. self._si_s = storage.si_b2a(self._storage_index)
219. self.init_logging()
220.
221. r = Results(self._storage_index)
222. self._output = VerifyingOutput(self._size, r)
223. self._paused = False
224. self._stopped = False
225.
226. self._results = None
227. self.active_buckets = {} # k: shnum, v: bucket
228. self._share_buckets = [] # list of (sharenum, bucket) tuples
229. self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
230. self._uri_extension_sources = []
231.
232. self._uri_extension_data = None
233.
234. self._fetch_failures = {"uri_extension": 0,
235. "plaintext_hashroot": 0,
236. "plaintext_hashtree": 0,
237. "crypttext_hashroot": 0,
238. "crypttext_hashtree": 0,
239. }
240.
241. def init_logging(self):
242. self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
243. num = self._client.log("SimpleCHKFileVerifier(%s): starting" % prefix)
244. self._log_number = num
245.
246. def log(self, msg, *args, **kwargs):
247. if not kwargs.get('parent'):
248. kwargs['parent'] = self._log_number
249. return self._client.log("SimpleCHKFileVerifier(%s): %s"
250. % (self._log_prefix, msg),
251. *args, **kwargs)
252.
253.
254. def start(self):
255. log.msg("starting download [%s]" % storage.si_b2a(self._storage_index)[:5])
256.
257. # first step: who should we download from?
258. d = defer.maybeDeferred(self._get_all_shareholders)
259. d.addCallback(self._got_all_shareholders)
260. # now get the uri_extension block from somebody and validate it
261. d.addCallback(self._obtain_uri_extension)
262. d.addCallback(self._got_uri_extension)
263. d.addCallback(self._get_hashtrees)
264. d.addCallback(self._create_validated_buckets)
265. # once we know that, we can download blocks from everybody
266. d.addCallback(self._download_all_segments)
267. d.addCallback(self._done)
268. return d
269.