source file: /home/buildslave/tahoe/edgy/build/src/allmydata/storage/expirer.py
file stats: 232 lines, 232 executed: 100.0% covered
coverage versus previous test: 0 lines added, 0 lines removed
1. import time, os, pickle, struct
2. from crawler import ShareCrawler
3. from shares import get_share_file
4. from common import UnknownMutableContainerVersionError, \
5. UnknownImmutableContainerVersionError
6. from twisted.python import log as twlog
7.
8. class LeaseCheckingCrawler(ShareCrawler):
9. """I examine the leases on all shares, determining which are still valid
10. and which have expired. I can remove the expired leases (if so
11. configured), and the share will be deleted when the last lease is
12. removed.
13.
14. I collect statistics on the leases and make these available to a web
15. status page, including::
16.
17. Space recovered during this cycle-so-far:
18. actual (only if expiration_enabled=True):
19. num-buckets, num-shares, sum of share sizes, real disk usage
20. ('real disk usage' means we use stat(fn).st_blocks*512 and include any
21. space used by the directory)
22. what it would have been with the original lease expiration time
23. what it would have been with our configured expiration time
24.
25. Prediction of space that will be recovered during the rest of this cycle
26. Prediction of space that will be recovered by the entire current cycle.
27.
28. Space recovered during the last 10 cycles <-- saved in separate pickle
29.
30. Shares/buckets examined:
31. this cycle-so-far
32. prediction of rest of cycle
33. during last 10 cycles <-- separate pickle
34. start/finish time of last 10 cycles <-- separate pickle
35. expiration time used for last 10 cycles <-- separate pickle
36.
37. Histogram of leases-per-share:
38. this-cycle-to-date
39. last 10 cycles <-- separate pickle
40. Histogram of lease ages, buckets = 1day
41. cycle-to-date
42. last 10 cycles <-- separate pickle
43.
44. All cycle-to-date values remain valid until the start of the next cycle.
45.
46. """
47.
48. slow_start = 360 # wait 6 minutes after startup
49. minimum_cycle_time = 12*60*60 # not more than twice per day
50.
51. def __init__(self, server, statefile, historyfile,
52. expiration_enabled, mode,
53. override_lease_duration, # used if expiration_mode=="age"
54. cutoff_date, # used if expiration_mode=="cutoff-date"
55. sharetypes):
56. self.historyfile = historyfile
57. self.expiration_enabled = expiration_enabled
58. self.mode = mode
59. self.override_lease_duration = None
60. self.cutoff_date = None
61. if self.mode == "age":
62. assert isinstance(override_lease_duration, (int, type(None)))
63. self.override_lease_duration = override_lease_duration # seconds
64. elif self.mode == "cutoff-date":
65. assert isinstance(cutoff_date, int) # seconds-since-epoch
66. assert cutoff_date is not None
67. self.cutoff_date = cutoff_date
68. else:
69. raise ValueError("GC mode '%s' must be 'age' or 'cutoff-date'" % mode)
70. self.sharetypes_to_expire = sharetypes
71. ShareCrawler.__init__(self, server, statefile)
72.
73. def add_initial_state(self):
74. # we fill ["cycle-to-date"] here (even though they will be reset in
75. # self.started_cycle) just in case someone grabs our state before we
76. # get started: unit tests do this
77. so_far = self.create_empty_cycle_dict()
78. self.state.setdefault("cycle-to-date", so_far)
79. # in case we upgrade the code while a cycle is in progress, update
80. # the keys individually
81. for k in so_far:
82. self.state["cycle-to-date"].setdefault(k, so_far[k])
83.
84. # initialize history
85. if not os.path.exists(self.historyfile):
86. history = {} # cyclenum -> dict
87. f = open(self.historyfile, "wb")
88. pickle.dump(history, f)
89. f.close()
90.
91. def create_empty_cycle_dict(self):
92. recovered = self.create_empty_recovered_dict()
93. so_far = {"corrupt-shares": [],
94. "space-recovered": recovered,
95. "lease-age-histogram": {}, # (minage,maxage)->count
96. "leases-per-share-histogram": {}, # leasecount->numshares
97. }
98. return so_far
99.
100. def create_empty_recovered_dict(self):
101. recovered = {}
102. for a in ("actual", "original", "configured", "examined"):
103. for b in ("buckets", "shares", "sharebytes", "diskbytes"):
104. recovered[a+"-"+b] = 0
105. recovered[a+"-"+b+"-mutable"] = 0
106. recovered[a+"-"+b+"-immutable"] = 0
107. return recovered
108.
109. def started_cycle(self, cycle):
110. self.state["cycle-to-date"] = self.create_empty_cycle_dict()
111.
112. def stat(self, fn):
113. return os.stat(fn)
114.
115. def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
116. bucketdir = os.path.join(prefixdir, storage_index_b32)
117. s = self.stat(bucketdir)
118. would_keep_shares = []
119. wks = None
120.
121. for fn in os.listdir(bucketdir):
122. try:
123. shnum = int(fn)
124. except ValueError:
125. continue # non-numeric means not a sharefile
126. sharefile = os.path.join(bucketdir, fn)
127. try:
128. wks = self.process_share(sharefile)
129. except (UnknownMutableContainerVersionError,
130. UnknownImmutableContainerVersionError,
131. struct.error):
132. twlog.msg("lease-checker error processing %s" % sharefile)
133. twlog.err()
134. which = (storage_index_b32, shnum)
135. self.state["cycle-to-date"]["corrupt-shares"].append(which)
136. wks = (1, 1, 1, "unknown")
137. would_keep_shares.append(wks)
138.
139. sharetype = None
140. if wks:
141. # use the last share's sharetype as the buckettype
142. sharetype = wks[3]
143. rec = self.state["cycle-to-date"]["space-recovered"]
144. self.increment(rec, "examined-buckets", 1)
145. if sharetype:
146. self.increment(rec, "examined-buckets-"+sharetype, 1)
147.
148. try:
149. bucket_diskbytes = s.st_blocks * 512
150. except AttributeError:
151. bucket_diskbytes = 0 # no stat().st_blocks on windows
152. if sum([wks[0] for wks in would_keep_shares]) == 0:
153. self.increment_bucketspace("original", bucket_diskbytes, sharetype)
154. if sum([wks[1] for wks in would_keep_shares]) == 0:
155. self.increment_bucketspace("configured", bucket_diskbytes, sharetype)
156. if sum([wks[2] for wks in would_keep_shares]) == 0:
157. self.increment_bucketspace("actual", bucket_diskbytes, sharetype)
158.
159. def process_share(self, sharefilename):
160. # first, find out what kind of a share it is
161. sf = get_share_file(sharefilename)
162. sharetype = sf.sharetype
163. now = time.time()
164. s = self.stat(sharefilename)
165.
166. num_leases = 0
167. num_valid_leases_original = 0
168. num_valid_leases_configured = 0
169. expired_leases_configured = []
170.
171. for li in sf.get_leases():
172. num_leases += 1
173. original_expiration_time = li.get_expiration_time()
174. grant_renew_time = li.get_grant_renew_time_time()
175. age = li.get_age()
176. self.add_lease_age_to_histogram(age)
177.
178. # expired-or-not according to original expiration time
179. if original_expiration_time > now:
180. num_valid_leases_original += 1
181.
182. # expired-or-not according to our configured age limit
183. expired = False
184. if self.mode == "age":
185. age_limit = original_expiration_time
186. if self.override_lease_duration is not None:
187. age_limit = self.override_lease_duration
188. if age > age_limit:
189. expired = True
190. else:
191. assert self.mode == "cutoff-date"
192. if grant_renew_time < self.cutoff_date:
193. expired = True
194. if sharetype not in self.sharetypes_to_expire:
195. expired = False
196.
197. if expired:
198. expired_leases_configured.append(li)
199. else:
200. num_valid_leases_configured += 1
201.
202. so_far = self.state["cycle-to-date"]
203. self.increment(so_far["leases-per-share-histogram"], num_leases, 1)
204. self.increment_space("examined", s, sharetype)
205.
206. would_keep_share = [1, 1, 1, sharetype]
207.
208. if self.expiration_enabled:
209. for li in expired_leases_configured:
210. sf.cancel_lease(li.cancel_secret)
211.
212. if num_valid_leases_original == 0:
213. would_keep_share[0] = 0
214. self.increment_space("original", s, sharetype)
215.
216. if num_valid_leases_configured == 0:
217. would_keep_share[1] = 0
218. self.increment_space("configured", s, sharetype)
219. if self.expiration_enabled:
220. would_keep_share[2] = 0
221. self.increment_space("actual", s, sharetype)
222.
223. return would_keep_share
224.
225. def increment_space(self, a, s, sharetype):
226. sharebytes = s.st_size
227. try:
228. # note that stat(2) says that st_blocks is 512 bytes, and that
229. # st_blksize is "optimal file sys I/O ops blocksize", which is
230. # independent of the block-size that st_blocks uses.
231. diskbytes = s.st_blocks * 512
232. except AttributeError:
233. # the docs say that st_blocks is only on linux. I also see it on
234. # MacOS. But it isn't available on windows.
235. diskbytes = sharebytes
236. so_far_sr = self.state["cycle-to-date"]["space-recovered"]
237. self.increment(so_far_sr, a+"-shares", 1)
238. self.increment(so_far_sr, a+"-sharebytes", sharebytes)
239. self.increment(so_far_sr, a+"-diskbytes", diskbytes)
240. if sharetype:
241. self.increment(so_far_sr, a+"-shares-"+sharetype, 1)
242. self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes)
243. self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes)
244.
245. def increment_bucketspace(self, a, bucket_diskbytes, sharetype):
246. rec = self.state["cycle-to-date"]["space-recovered"]
247. self.increment(rec, a+"-diskbytes", bucket_diskbytes)
248. self.increment(rec, a+"-buckets", 1)
249. if sharetype:
250. self.increment(rec, a+"-diskbytes-"+sharetype, bucket_diskbytes)
251. self.increment(rec, a+"-buckets-"+sharetype, 1)
252.
253. def increment(self, d, k, delta=1):
254. if k not in d:
255. d[k] = 0
256. d[k] += delta
257.
258. def add_lease_age_to_histogram(self, age):
259. bucket_interval = 24*60*60
260. bucket_number = int(age/bucket_interval)
261. bucket_start = bucket_number * bucket_interval
262. bucket_end = bucket_start + bucket_interval
263. k = (bucket_start, bucket_end)
264. self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1)
265.
266. def convert_lease_age_histogram(self, lah):
267. # convert { (minage,maxage) : count } into [ (minage,maxage,count) ]
268. # since the former is not JSON-safe (JSON dictionaries must have
269. # string keys).
270. json_safe_lah = []
271. for k in sorted(lah):
272. (minage,maxage) = k
273. json_safe_lah.append( (minage, maxage, lah[k]) )
274. return json_safe_lah
275.
276. def finished_cycle(self, cycle):
277. # add to our history state, prune old history
278. h = {}
279.
280. start = self.state["current-cycle-start-time"]
281. now = time.time()
282. h["cycle-start-finish-times"] = (start, now)
283. h["expiration-enabled"] = self.expiration_enabled
284. h["configured-expiration-mode"] = (self.mode,
285. self.override_lease_duration,
286. self.cutoff_date,
287. self.sharetypes_to_expire)
288.
289. s = self.state["cycle-to-date"]
290.
291. # state["lease-age-histogram"] is a dictionary (mapping
292. # (minage,maxage) tuple to a sharecount), but we report
293. # self.get_state()["lease-age-histogram"] as a list of
294. # (min,max,sharecount) tuples, because JSON can handle that better.
295. # We record the list-of-tuples form into the history for the same
296. # reason.
297. lah = self.convert_lease_age_histogram(s["lease-age-histogram"])
298. h["lease-age-histogram"] = lah
299. h["leases-per-share-histogram"] = s["leases-per-share-histogram"].copy()
300. h["corrupt-shares"] = s["corrupt-shares"][:]
301. # note: if ["shares-recovered"] ever acquires an internal dict, this
302. # copy() needs to become a deepcopy
303. h["space-recovered"] = s["space-recovered"].copy()
304.
305. history = pickle.load(open(self.historyfile, "rb"))
306. history[cycle] = h
307. while len(history) > 10:
308. oldcycles = sorted(history.keys())
309. del history[oldcycles[0]]
310. f = open(self.historyfile, "wb")
311. pickle.dump(history, f)
312. f.close()
313.
314. def get_state(self):
315. """In addition to the crawler state described in
316. ShareCrawler.get_state(), I return the following keys which are
317. specific to the lease-checker/expirer. Note that the non-history keys
318. (with 'cycle' in their names) are only present if a cycle is
319. currently running. If the crawler is between cycles, it appropriate
320. to show the latest item in the 'history' key instead. Also note that
321. each history item has all the data in the 'cycle-to-date' value, plus
322. cycle-start-finish-times.
323.
324. cycle-to-date:
325. expiration-enabled
326. configured-expiration-mode
327. lease-age-histogram (list of (minage,maxage,sharecount) tuples)
328. leases-per-share-histogram
329. corrupt-shares (list of (si_b32,shnum) tuples, minimal verification)
330. space-recovered
331.
332. estimated-remaining-cycle:
333. # Values may be None if not enough data has been gathered to
334. # produce an estimate.
335. space-recovered
336.
337. estimated-current-cycle:
338. # cycle-to-date plus estimated-remaining. Values may be None if
339. # not enough data has been gathered to produce an estimate.
340. space-recovered
341.
342. history: maps cyclenum to a dict with the following keys:
343. cycle-start-finish-times
344. expiration-enabled
345. configured-expiration-mode
346. lease-age-histogram
347. leases-per-share-histogram
348. corrupt-shares
349. space-recovered
350.
351. The 'space-recovered' structure is a dictionary with the following
352. keys:
353. # 'examined' is what was looked at
354. examined-buckets, examined-buckets-mutable, examined-buckets-immutable
355. examined-shares, -mutable, -immutable
356. examined-sharebytes, -mutable, -immutable
357. examined-diskbytes, -mutable, -immutable
358.
359. # 'actual' is what was actually deleted
360. actual-buckets, -mutable, -immutable
361. actual-shares, -mutable, -immutable
362. actual-sharebytes, -mutable, -immutable
363. actual-diskbytes, -mutable, -immutable
364.
365. # would have been deleted, if the original lease timer was used
366. original-buckets, -mutable, -immutable
367. original-shares, -mutable, -immutable
368. original-sharebytes, -mutable, -immutable
369. original-diskbytes, -mutable, -immutable
370.
371. # would have been deleted, if our configured max_age was used
372. configured-buckets, -mutable, -immutable
373. configured-shares, -mutable, -immutable
374. configured-sharebytes, -mutable, -immutable
375. configured-diskbytes, -mutable, -immutable
376.
377. """
378. progress = self.get_progress()
379.
380. state = ShareCrawler.get_state(self) # does a shallow copy
381. history = pickle.load(open(self.historyfile, "rb"))
382. state["history"] = history
383.
384. if not progress["cycle-in-progress"]:
385. del state["cycle-to-date"]
386. return state
387.
388. so_far = state["cycle-to-date"].copy()
389. state["cycle-to-date"] = so_far
390.
391. lah = so_far["lease-age-histogram"]
392. so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah)
393. so_far["expiration-enabled"] = self.expiration_enabled
394. so_far["configured-expiration-mode"] = (self.mode,
395. self.override_lease_duration,
396. self.cutoff_date,
397. self.sharetypes_to_expire)
398.
399. so_far_sr = so_far["space-recovered"]
400. remaining_sr = {}
401. remaining = {"space-recovered": remaining_sr}
402. cycle_sr = {}
403. cycle = {"space-recovered": cycle_sr}
404.
405. if progress["cycle-complete-percentage"] > 0.0:
406. pc = progress["cycle-complete-percentage"] / 100.0
407. m = (1-pc)/pc
408. for a in ("actual", "original", "configured", "examined"):
409. for b in ("buckets", "shares", "sharebytes", "diskbytes"):
410. for c in ("", "-mutable", "-immutable"):
411. k = a+"-"+b+c
412. remaining_sr[k] = m * so_far_sr[k]
413. cycle_sr[k] = so_far_sr[k] + remaining_sr[k]
414. else:
415. for a in ("actual", "original", "configured", "examined"):
416. for b in ("buckets", "shares", "sharebytes", "diskbytes"):
417. for c in ("", "-mutable", "-immutable"):
418. k = a+"-"+b+c
419. remaining_sr[k] = None
420. cycle_sr[k] = None
421.
422. state["estimated-remaining-cycle"] = remaining
423. state["estimated-current-cycle"] = cycle
424. return state