source file: /home/buildslave/tahoe/edgy/build/src/allmydata/storage/mutable.py
file stats: 291 lines, 288 executed: 99.0% covered
coverage versus previous test: 0 lines added, 0 lines removed
1. import os, stat, struct
2.
3. from allmydata.interfaces import BadWriteEnablerError
4. from allmydata.util import idlib, log
5. from allmydata.util.assertutil import precondition
6. from allmydata.util.hashutil import constant_time_compare
7. from allmydata.storage.lease import LeaseInfo
8. from allmydata.storage.common import UnknownMutableContainerVersionError, \
9. DataTooLargeError
10.
11. # the MutableShareFile is like the ShareFile, but used for mutable data. It
12. # has a different layout. See docs/mutable.txt for more details.
13.
14. # # offset size name
15. # 1 0 32 magic verstr "tahoe mutable container v1" plus binary
16. # 2 32 20 write enabler's nodeid
17. # 3 52 32 write enabler
18. # 4 84 8 data size (actual share data present) (a)
19. # 5 92 8 offset of (8) count of extra leases (after data)
20. # 6 100 368 four leases, 92 bytes each
21. # 0 4 ownerid (0 means "no lease here")
22. # 4 4 expiration timestamp
23. # 8 32 renewal token
24. # 40 32 cancel token
25. # 72 20 nodeid which accepted the tokens
26. # 7 468 (a) data
27. # 8 ?? 4 count of extra leases
28. # 9 ?? n*92 extra leases
29.
30.
31. assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
32. assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
33.
34. class MutableShareFile:
35.
36. sharetype = "mutable"
37. DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
38. EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
39. HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
40. LEASE_SIZE = struct.calcsize(">LL32s32s20s")
41. assert LEASE_SIZE == 92
42. DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
43. assert DATA_OFFSET == 468, DATA_OFFSET
44. # our sharefiles share with a recognizable string, plus some random
45. # binary data to reduce the chance that a regular text file will look
46. # like a sharefile.
47. MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
48. assert len(MAGIC) == 32
49. MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
50. # TODO: decide upon a policy for max share size
51.
52. def __init__(self, filename, parent=None):
53. self.home = filename
54. if os.path.exists(self.home):
55. # we don't cache anything, just check the magic
56. f = open(self.home, 'rb')
57. data = f.read(self.HEADER_SIZE)
58. (magic,
59. write_enabler_nodeid, write_enabler,
60. data_length, extra_least_offset) = \
61. struct.unpack(">32s20s32sQQ", data)
62. if magic != self.MAGIC:
63. msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
64. (filename, magic, self.MAGIC)
65. raise UnknownMutableContainerVersionError(msg)
66. self.parent = parent # for logging
67.
68. def log(self, *args, **kwargs):
69. return self.parent.log(*args, **kwargs)
70.
71. def create(self, my_nodeid, write_enabler):
72. assert not os.path.exists(self.home)
73. data_length = 0
74. extra_lease_offset = (self.HEADER_SIZE
75. + 4 * self.LEASE_SIZE
76. + data_length)
77. assert extra_lease_offset == self.DATA_OFFSET # true at creation
78. num_extra_leases = 0
79. f = open(self.home, 'wb')
80. header = struct.pack(">32s20s32sQQ",
81. self.MAGIC, my_nodeid, write_enabler,
82. data_length, extra_lease_offset,
83. )
84. leases = ("\x00"*self.LEASE_SIZE) * 4
85. f.write(header + leases)
86. # data goes here, empty after creation
87. f.write(struct.pack(">L", num_extra_leases))
88. # extra leases go here, none at creation
89. f.close()
90.
91. def unlink(self):
92. os.unlink(self.home)
93.
94. def _read_data_length(self, f):
95. f.seek(self.DATA_LENGTH_OFFSET)
96. (data_length,) = struct.unpack(">Q", f.read(8))
97. return data_length
98.
99. def _write_data_length(self, f, data_length):
100. f.seek(self.DATA_LENGTH_OFFSET)
101. f.write(struct.pack(">Q", data_length))
102.
103. def _read_share_data(self, f, offset, length):
104. precondition(offset >= 0)
105. data_length = self._read_data_length(f)
106. if offset+length > data_length:
107. # reads beyond the end of the data are truncated. Reads that
108. # start beyond the end of the data return an empty string.
109. length = max(0, data_length-offset)
110. if length == 0:
111. return ""
112. precondition(offset+length <= data_length)
113. f.seek(self.DATA_OFFSET+offset)
114. data = f.read(length)
115. return data
116.
117. def _read_extra_lease_offset(self, f):
118. f.seek(self.EXTRA_LEASE_OFFSET)
119. (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
120. return extra_lease_offset
121.
122. def _write_extra_lease_offset(self, f, offset):
123. f.seek(self.EXTRA_LEASE_OFFSET)
124. f.write(struct.pack(">Q", offset))
125.
126. def _read_num_extra_leases(self, f):
127. offset = self._read_extra_lease_offset(f)
128. f.seek(offset)
129. (num_extra_leases,) = struct.unpack(">L", f.read(4))
130. return num_extra_leases
131.
132. def _write_num_extra_leases(self, f, num_leases):
133. extra_lease_offset = self._read_extra_lease_offset(f)
134. f.seek(extra_lease_offset)
135. f.write(struct.pack(">L", num_leases))
136.
137. def _change_container_size(self, f, new_container_size):
138. if new_container_size > self.MAX_SIZE:
139. raise DataTooLargeError()
140. old_extra_lease_offset = self._read_extra_lease_offset(f)
141. new_extra_lease_offset = self.DATA_OFFSET + new_container_size
142. if new_extra_lease_offset < old_extra_lease_offset:
143. # TODO: allow containers to shrink. For now they remain large.
144. return
145. num_extra_leases = self._read_num_extra_leases(f)
146. f.seek(old_extra_lease_offset)
147. extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
148. f.seek(new_extra_lease_offset)
149. f.write(extra_lease_data)
150. # an interrupt here will corrupt the leases, iff the move caused the
151. # extra leases to overlap.
152. self._write_extra_lease_offset(f, new_extra_lease_offset)
153.
154. def _write_share_data(self, f, offset, data):
155. length = len(data)
156. precondition(offset >= 0)
157. data_length = self._read_data_length(f)
158. extra_lease_offset = self._read_extra_lease_offset(f)
159.
160. if offset+length >= data_length:
161. # They are expanding their data size.
162. if self.DATA_OFFSET+offset+length > extra_lease_offset:
163. # Their new data won't fit in the current container, so we
164. # have to move the leases. With luck, they're expanding it
165. # more than the size of the extra lease block, which will
166. # minimize the corrupt-the-share window
167. self._change_container_size(f, offset+length)
168. extra_lease_offset = self._read_extra_lease_offset(f)
169.
170. # an interrupt here is ok.. the container has been enlarged
171. # but the data remains untouched
172.
173. assert self.DATA_OFFSET+offset+length <= extra_lease_offset
174. # Their data now fits in the current container. We must write
175. # their new data and modify the recorded data size.
176. new_data_length = offset+length
177. self._write_data_length(f, new_data_length)
178. # an interrupt here will result in a corrupted share
179.
180. # now all that's left to do is write out their data
181. f.seek(self.DATA_OFFSET+offset)
182. f.write(data)
183. return
184.
185. def _write_lease_record(self, f, lease_number, lease_info):
186. extra_lease_offset = self._read_extra_lease_offset(f)
187. num_extra_leases = self._read_num_extra_leases(f)
188. if lease_number < 4:
189. offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
190. elif (lease_number-4) < num_extra_leases:
191. offset = (extra_lease_offset
192. + 4
193. + (lease_number-4)*self.LEASE_SIZE)
194. else:
195. # must add an extra lease record
196. self._write_num_extra_leases(f, num_extra_leases+1)
197. offset = (extra_lease_offset
198. + 4
199. + (lease_number-4)*self.LEASE_SIZE)
200. f.seek(offset)
201. assert f.tell() == offset
202. f.write(lease_info.to_mutable_data())
203.
204. def _read_lease_record(self, f, lease_number):
205. # returns a LeaseInfo instance, or None
206. extra_lease_offset = self._read_extra_lease_offset(f)
207. num_extra_leases = self._read_num_extra_leases(f)
208. if lease_number < 4:
209. offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
210. elif (lease_number-4) < num_extra_leases:
211. offset = (extra_lease_offset
212. + 4
213. + (lease_number-4)*self.LEASE_SIZE)
214. else:
215. raise IndexError("No such lease number %d" % lease_number)
216. f.seek(offset)
217. assert f.tell() == offset
218. data = f.read(self.LEASE_SIZE)
219. lease_info = LeaseInfo().from_mutable_data(data)
220. if lease_info.owner_num == 0:
221. return None
222. return lease_info
223.
224. def _get_num_lease_slots(self, f):
225. # how many places do we have allocated for leases? Not all of them
226. # are filled.
227. num_extra_leases = self._read_num_extra_leases(f)
228. return 4+num_extra_leases
229.
230. def _get_first_empty_lease_slot(self, f):
231. # return an int with the index of an empty slot, or None if we do not
232. # currently have an empty slot
233.
234. for i in range(self._get_num_lease_slots(f)):
235. if self._read_lease_record(f, i) is None:
236. return i
237. return None
238.
239. def get_leases(self):
240. """Yields a LeaseInfo instance for all leases."""
241. f = open(self.home, 'rb')
242. for i, lease in self._enumerate_leases(f):
243. yield lease
244. f.close()
245.
246. def _enumerate_leases(self, f):
247. for i in range(self._get_num_lease_slots(f)):
248. try:
249. data = self._read_lease_record(f, i)
250. if data is not None:
251. yield i,data
252. except IndexError:
253. return
254.
255. def add_lease(self, lease_info):
256. precondition(lease_info.owner_num != 0) # 0 means "no lease here"
257. f = open(self.home, 'rb+')
258. num_lease_slots = self._get_num_lease_slots(f)
259. empty_slot = self._get_first_empty_lease_slot(f)
260. if empty_slot is not None:
261. self._write_lease_record(f, empty_slot, lease_info)
262. else:
263. self._write_lease_record(f, num_lease_slots, lease_info)
264. f.close()
265.
266. def renew_lease(self, renew_secret, new_expire_time):
267. accepting_nodeids = set()
268. f = open(self.home, 'rb+')
269. for (leasenum,lease) in self._enumerate_leases(f):
270. if constant_time_compare(lease.renew_secret, renew_secret):
271. # yup. See if we need to update the owner time.
272. if new_expire_time > lease.expiration_time:
273. # yes
274. lease.expiration_time = new_expire_time
275. self._write_lease_record(f, leasenum, lease)
276. f.close()
277. return
278. accepting_nodeids.add(lease.nodeid)
279. f.close()
280. # Return the accepting_nodeids set, to give the client a chance to
281. # update the leases on a share which has been migrated from its
282. # original server to a new one.
283. msg = ("Unable to renew non-existent lease. I have leases accepted by"
284. " nodeids: ")
285. msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
286. for anid in accepting_nodeids])
287. msg += " ."
288. raise IndexError(msg)
289.
290. def add_or_renew_lease(self, lease_info):
291. precondition(lease_info.owner_num != 0) # 0 means "no lease here"
292. try:
293. self.renew_lease(lease_info.renew_secret,
294. lease_info.expiration_time)
295. except IndexError:
296. self.add_lease(lease_info)
297.
298. def cancel_lease(self, cancel_secret):
299. """Remove any leases with the given cancel_secret. If the last lease
300. is cancelled, the file will be removed. Return the number of bytes
301. that were freed (by truncating the list of leases, and possibly by
302. deleting the file. Raise IndexError if there was no lease with the
303. given cancel_secret."""
304.
305. accepting_nodeids = set()
306. modified = 0
307. remaining = 0
308. blank_lease = LeaseInfo(owner_num=0,
309. renew_secret="\x00"*32,
310. cancel_secret="\x00"*32,
311. expiration_time=0,
312. nodeid="\x00"*20)
313. f = open(self.home, 'rb+')
314. for (leasenum,lease) in self._enumerate_leases(f):
315. accepting_nodeids.add(lease.nodeid)
316. if constant_time_compare(lease.cancel_secret, cancel_secret):
317. self._write_lease_record(f, leasenum, blank_lease)
318. modified += 1
319. else:
320. remaining += 1
321. if modified:
322. freed_space = self._pack_leases(f)
323. f.close()
324. if not remaining:
325. freed_space += os.stat(self.home)[stat.ST_SIZE]
326. self.unlink()
327. return freed_space
328.
329. msg = ("Unable to cancel non-existent lease. I have leases "
330. "accepted by nodeids: ")
331. msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
332. for anid in accepting_nodeids])
333. msg += " ."
334. raise IndexError(msg)
335.
336. def _pack_leases(self, f):
337. # TODO: reclaim space from cancelled leases
338. return 0
339.
340. def _read_write_enabler_and_nodeid(self, f):
341. f.seek(0)
342. data = f.read(self.HEADER_SIZE)
343. (magic,
344. write_enabler_nodeid, write_enabler,
345. data_length, extra_least_offset) = \
346. struct.unpack(">32s20s32sQQ", data)
347. assert magic == self.MAGIC
348. return (write_enabler, write_enabler_nodeid)
349.
350. def readv(self, readv):
351. datav = []
352. f = open(self.home, 'rb')
353. for (offset, length) in readv:
354. datav.append(self._read_share_data(f, offset, length))
355. f.close()
356. return datav
357.
358. # def remote_get_length(self):
359. # f = open(self.home, 'rb')
360. # data_length = self._read_data_length(f)
361. # f.close()
362. # return data_length
363.
364. def check_write_enabler(self, write_enabler, si_s):
365. f = open(self.home, 'rb+')
366. (real_write_enabler, write_enabler_nodeid) = \
367. self._read_write_enabler_and_nodeid(f)
368. f.close()
369. # avoid a timing attack
370. #if write_enabler != real_write_enabler:
371. if not constant_time_compare(write_enabler, real_write_enabler):
372. # accomodate share migration by reporting the nodeid used for the
373. # old write enabler.
374. self.log(format="bad write enabler on SI %(si)s,"
375. " recorded by nodeid %(nodeid)s",
376. facility="tahoe.storage",
377. level=log.WEIRD, umid="cE1eBQ",
378. si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
379. msg = "The write enabler was recorded by nodeid '%s'." % \
380. (idlib.nodeid_b2a(write_enabler_nodeid),)
381. raise BadWriteEnablerError(msg)
382.
383. def check_testv(self, testv):
384. test_good = True
385. f = open(self.home, 'rb+')
386. for (offset, length, operator, specimen) in testv:
387. data = self._read_share_data(f, offset, length)
388. if not testv_compare(data, operator, specimen):
389. test_good = False
390. break
391. f.close()
392. return test_good
393.
394. def writev(self, datav, new_length):
395. f = open(self.home, 'rb+')
396. for (offset, data) in datav:
397. self._write_share_data(f, offset, data)
398. if new_length is not None:
399. self._change_container_size(f, new_length)
400. f.seek(self.DATA_LENGTH_OFFSET)
401. f.write(struct.pack(">Q", new_length))
402. f.close()
403.
404. def testv_compare(a, op, b):
405. assert op in ("lt", "le", "eq", "ne", "ge", "gt")
406. if op == "lt":
407. return a < b
408. if op == "le":
409. return a <= b
410. if op == "eq":
411. return a == b
412. if op == "ne":
413. return a != b
414. if op == "ge":
415. return a >= b
416. if op == "gt":
417. return a > b
418. # never reached
419.
420. class EmptyShare:
421.
422. def check_testv(self, testv):
423. test_good = True
424. for (offset, length, operator, specimen) in testv:
425. data = ""
426. if not testv_compare(data, operator, specimen):
427. test_good = False
428. break
429. return test_good
430.
431. def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
432. ms = MutableShareFile(filename, parent)
433. ms.create(my_nodeid, write_enabler)
434. del ms
435. return MutableShareFile(filename, parent)
436.