source file: /home/buildslave/tahoe/edgy/build/src/allmydata/immutable/layout.py
file stats: 320 lines, 294 executed: 91.9% covered
coverage versus previous test: 0 lines added, 0 lines removed
1. import struct
2. from zope.interface import implements
3. from twisted.internet import defer
4. from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
5. FileTooLargeError, HASH_SIZE
6. from allmydata.util import mathutil, idlib, observer, pipeline
7. from allmydata.util.assertutil import precondition
8. from allmydata.storage.server import si_b2a
9.
10. class LayoutInvalid(Exception):
11. """ There is something wrong with these bytes so they can't be
12. interpreted as the kind of immutable file that I know how to download."""
13. pass
14.
15. class RidiculouslyLargeURIExtensionBlock(LayoutInvalid):
16. """ When downloading a file, the length of the URI Extension Block was
17. given as >= 2**32. This means the share data must have been corrupted, or
18. else the original uploader of the file wrote a ridiculous value into the
19. URI Extension Block length."""
20. pass
21.
22. class ShareVersionIncompatible(LayoutInvalid):
23. """ When downloading a share, its format was not one of the formats we
24. know how to parse."""
25. pass
26.
27. """
28. Share data is written in a file. At the start of the file, there is a series
29. of four-byte big-endian offset values, which indicate where each section
30. starts. Each offset is measured from the beginning of the share data.
31.
32. 0x00: version number (=00 00 00 01)
33. 0x04: block size # See Footnote 1 below.
34. 0x08: share data size # See Footnote 1 below.
35. 0x0c: offset of data (=00 00 00 24)
36. 0x10: offset of plaintext_hash_tree UNUSED
37. 0x14: offset of crypttext_hash_tree
38. 0x18: offset of block_hashes
39. 0x1c: offset of share_hashes
40. 0x20: offset of uri_extension_length + uri_extension
41. 0x24: start of data
42. ? : start of plaintext_hash_tree UNUSED
43. ? : start of crypttext_hash_tree
44. ? : start of block_hashes
45. ? : start of share_hashes
46. each share_hash is written as a two-byte (big-endian) hashnum
47. followed by the 32-byte SHA-256 hash. We store only the hashes
48. necessary to validate the share hash root
49. ? : start of uri_extension_length (four-byte big-endian value)
50. ? : start of uri_extension
51. """
52.
53. """
54. v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size
55. limitations described in #346.
56.
57. 0x00: version number (=00 00 00 02)
58. 0x04: block size # See Footnote 1 below.
59. 0x0c: share data size # See Footnote 1 below.
60. 0x14: offset of data (=00 00 00 00 00 00 00 44)
61. 0x1c: offset of plaintext_hash_tree UNUSED
62. 0x24: offset of crypttext_hash_tree
63. 0x2c: offset of block_hashes
64. 0x34: offset of share_hashes
65. 0x3c: offset of uri_extension_length + uri_extension
66. 0x44: start of data
67. : rest of share is the same as v1, above
68. ... ...
69. ? : start of uri_extension_length (eight-byte big-endian value)
70. ? : start of uri_extension
71. """
72.
73. # Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but
74. # they are still provided when writing so that older versions of Tahoe can
75. # read them.
76.
77. def make_write_bucket_proxy(rref, data_size, block_size, num_segments,
78. num_share_hashes, uri_extension_size_max, nodeid):
79. # Use layout v1 for small files, so they'll be readable by older versions
80. # (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
81. # by tahoe-1.3.0 or later.
82. try:
83. wbp = WriteBucketProxy(rref, data_size, block_size, num_segments,
84. num_share_hashes, uri_extension_size_max, nodeid)
85. except FileTooLargeError:
86. wbp = WriteBucketProxy_v2(rref, data_size, block_size, num_segments,
87. num_share_hashes, uri_extension_size_max, nodeid)
88. return wbp
89.
90. class WriteBucketProxy:
91. implements(IStorageBucketWriter)
92. fieldsize = 4
93. fieldstruct = ">L"
94.
95. def __init__(self, rref, data_size, block_size, num_segments,
96. num_share_hashes, uri_extension_size_max, nodeid,
97. pipeline_size=50000):
98. self._rref = rref
99. self._data_size = data_size
100. self._block_size = block_size
101. self._num_segments = num_segments
102. self._nodeid = nodeid
103.
104. effective_segments = mathutil.next_power_of_k(num_segments,2)
105. self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
106. # how many share hashes are included in each share? This will be
107. # about ln2(num_shares).
108. self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
109. # we commit to not sending a uri extension larger than this
110. self._uri_extension_size_max = uri_extension_size_max
111.
112. self._create_offsets(block_size, data_size)
113.
114. # k=3, max_segment_size=128KiB gives us a typical segment of 43691
115. # bytes. Setting the default pipeline_size to 50KB lets us get two
116. # segments onto the wire but not a third, which would keep the pipe
117. # filled.
118. self._pipeline = pipeline.Pipeline(pipeline_size)
119.
120. def get_allocated_size(self):
121. return (self._offsets['uri_extension'] + self.fieldsize +
122. self._uri_extension_size_max)
123.
124. def _create_offsets(self, block_size, data_size):
125. if block_size >= 2**32 or data_size >= 2**32:
126. raise FileTooLargeError("This file is too large to be uploaded (data_size).")
127.
128. offsets = self._offsets = {}
129. x = 0x24
130. offsets['data'] = x
131. x += data_size
132. offsets['plaintext_hash_tree'] = x # UNUSED
133. x += self._segment_hash_size
134. offsets['crypttext_hash_tree'] = x
135. x += self._segment_hash_size
136. offsets['block_hashes'] = x
137. x += self._segment_hash_size
138. offsets['share_hashes'] = x
139. x += self._share_hashtree_size
140. offsets['uri_extension'] = x
141.
142. if x >= 2**32:
143. raise FileTooLargeError("This file is too large to be uploaded (offsets).")
144.
145. offset_data = struct.pack(">LLLLLLLLL",
146. 1, # version number
147. block_size,
148. data_size,
149. offsets['data'],
150. offsets['plaintext_hash_tree'], # UNUSED
151. offsets['crypttext_hash_tree'],
152. offsets['block_hashes'],
153. offsets['share_hashes'],
154. offsets['uri_extension'],
155. )
156. assert len(offset_data) == 0x24
157. self._offset_data = offset_data
158.
159. def __repr__(self):
160. if self._nodeid:
161. nodeid_s = idlib.nodeid_b2a(self._nodeid)
162. else:
163. nodeid_s = "[None]"
164. return "<WriteBucketProxy for node %s>" % nodeid_s
165.
166. def put_header(self):
167. return self._write(0, self._offset_data)
168.
169. def put_block(self, segmentnum, data):
170. offset = self._offsets['data'] + segmentnum * self._block_size
171. assert offset + len(data) <= self._offsets['uri_extension']
172. assert isinstance(data, str)
173. if segmentnum < self._num_segments-1:
174. precondition(len(data) == self._block_size,
175. len(data), self._block_size)
176. else:
177. precondition(len(data) == (self._data_size -
178. (self._block_size *
179. (self._num_segments - 1))),
180. len(data), self._block_size)
181. return self._write(offset, data)
182.
183. def put_crypttext_hashes(self, hashes):
184. offset = self._offsets['crypttext_hash_tree']
185. assert isinstance(hashes, list)
186. data = "".join(hashes)
187. precondition(len(data) == self._segment_hash_size,
188. len(data), self._segment_hash_size)
189. precondition(offset + len(data) <= self._offsets['block_hashes'],
190. offset, len(data), offset+len(data),
191. self._offsets['block_hashes'])
192. return self._write(offset, data)
193.
194. def put_block_hashes(self, blockhashes):
195. offset = self._offsets['block_hashes']
196. assert isinstance(blockhashes, list)
197. data = "".join(blockhashes)
198. precondition(len(data) == self._segment_hash_size,
199. len(data), self._segment_hash_size)
200. precondition(offset + len(data) <= self._offsets['share_hashes'],
201. offset, len(data), offset+len(data),
202. self._offsets['share_hashes'])
203. return self._write(offset, data)
204.
205. def put_share_hashes(self, sharehashes):
206. # sharehashes is a list of (index, hash) tuples, so they get stored
207. # as 2+32=34 bytes each
208. offset = self._offsets['share_hashes']
209. assert isinstance(sharehashes, list)
210. data = "".join([struct.pack(">H", hashnum) + hashvalue
211. for hashnum,hashvalue in sharehashes])
212. precondition(len(data) == self._share_hashtree_size,
213. len(data), self._share_hashtree_size)
214. precondition(offset + len(data) <= self._offsets['uri_extension'],
215. offset, len(data), offset+len(data),
216. self._offsets['uri_extension'])
217. return self._write(offset, data)
218.
219. def put_uri_extension(self, data):
220. offset = self._offsets['uri_extension']
221. assert isinstance(data, str)
222. precondition(len(data) <= self._uri_extension_size_max,
223. len(data), self._uri_extension_size_max)
224. length = struct.pack(self.fieldstruct, len(data))
225. return self._write(offset, length+data)
226.
227. def _write(self, offset, data):
228. # use a Pipeline to pipeline several writes together. TODO: another
229. # speedup would be to coalesce small writes into a single call: this
230. # would reduce the foolscap CPU overhead per share, but wouldn't
231. # reduce the number of round trips, so it might not be worth the
232. # effort.
233.
234. return self._pipeline.add(len(data),
235. self._rref.callRemote, "write", offset, data)
236.
237. def close(self):
238. d = self._pipeline.add(0, self._rref.callRemote, "close")
239. d.addCallback(lambda ign: self._pipeline.flush())
240. return d
241.
242. def abort(self):
243. return self._rref.callRemoteOnly("abort")
244.
245. class WriteBucketProxy_v2(WriteBucketProxy):
246. fieldsize = 8
247. fieldstruct = ">Q"
248.
249. def _create_offsets(self, block_size, data_size):
250. if block_size >= 2**64 or data_size >= 2**64:
251. raise FileTooLargeError("This file is too large to be uploaded (data_size).")
252.
253. offsets = self._offsets = {}
254. x = 0x44
255. offsets['data'] = x
256. x += data_size
257. offsets['plaintext_hash_tree'] = x # UNUSED
258. x += self._segment_hash_size
259. offsets['crypttext_hash_tree'] = x
260. x += self._segment_hash_size
261. offsets['block_hashes'] = x
262. x += self._segment_hash_size
263. offsets['share_hashes'] = x
264. x += self._share_hashtree_size
265. offsets['uri_extension'] = x
266.
267. if x >= 2**64:
268. raise FileTooLargeError("This file is too large to be uploaded (offsets).")
269.
270. offset_data = struct.pack(">LQQQQQQQQ",
271. 2, # version number
272. block_size,
273. data_size,
274. offsets['data'],
275. offsets['plaintext_hash_tree'], # UNUSED
276. offsets['crypttext_hash_tree'],
277. offsets['block_hashes'],
278. offsets['share_hashes'],
279. offsets['uri_extension'],
280. )
281. assert len(offset_data) == 0x44, len(offset_data)
282. self._offset_data = offset_data
283.
284. class ReadBucketProxy:
285. implements(IStorageBucketReader)
286.
287. MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
288.
289. def __init__(self, rref, peerid, storage_index):
290. self._rref = rref
291. self._peerid = peerid
292. peer_id_s = idlib.shortnodeid_b2a(peerid)
293. storage_index_s = si_b2a(storage_index)
294. self._reprstr = "<ReadBucketProxy %s to peer [%s] SI %s>" % (id(self), peer_id_s, storage_index_s)
295. self._started = False # sent request to server
296. self._ready = observer.OneShotObserverList() # got response from server
297.
298. def get_peerid(self):
299. return self._peerid
300.
301. def __repr__(self):
302. return self._reprstr
303.
304. def _start_if_needed(self):
305. """ Returns a deferred that will be fired when I'm ready to return
306. data, or errbacks if the starting (header reading and parsing)
307. process fails."""
308. if not self._started:
309. self._start()
310. return self._ready.when_fired()
311.
312. def _start(self):
313. self._started = True
314. # TODO: for small shares, read the whole bucket in _start()
315. d = self._fetch_header()
316. d.addCallback(self._parse_offsets)
317. # XXX The following two callbacks implement a slightly faster/nicer
318. # way to get the ueb and sharehashtree, but it requires that the
319. # storage server be >= v1.3.0.
320. # d.addCallback(self._fetch_sharehashtree_and_ueb)
321. # d.addCallback(self._parse_sharehashtree_and_ueb)
322. def _fail_waiters(f):
323. self._ready.fire(f)
324. def _notify_waiters(result):
325. self._ready.fire(result)
326. d.addCallbacks(_notify_waiters, _fail_waiters)
327. return d
328.
329. def _fetch_header(self):
330. return self._read(0, 0x44)
331.
332. def _parse_offsets(self, data):
333. precondition(len(data) >= 0x4)
334. self._offsets = {}
335. (version,) = struct.unpack(">L", data[0:4])
336. if version != 1 and version != 2:
337. raise ShareVersionIncompatible(version)
338.
339. if version == 1:
340. precondition(len(data) >= 0x24)
341. x = 0x0c
342. fieldsize = 0x4
343. fieldstruct = ">L"
344. else:
345. precondition(len(data) >= 0x44)
346. x = 0x14
347. fieldsize = 0x8
348. fieldstruct = ">Q"
349.
350. self._version = version
351. self._fieldsize = fieldsize
352. self._fieldstruct = fieldstruct
353.
354. for field in ( 'data',
355. 'plaintext_hash_tree', # UNUSED
356. 'crypttext_hash_tree',
357. 'block_hashes',
358. 'share_hashes',
359. 'uri_extension',
360. ):
361. offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
362. x += fieldsize
363. self._offsets[field] = offset
364. return self._offsets
365.
366. def _fetch_sharehashtree_and_ueb(self, offsets):
367. sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes']
368. return self._read(offsets['share_hashes'],
369. self.MAX_UEB_SIZE+sharehashtree_size)
370.
371. def _parse_sharehashtree_and_ueb(self, data):
372. sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes']
373. if len(data) < sharehashtree_size:
374. raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data)))
375. if sharehashtree_size % (2+HASH_SIZE) != 0:
376. raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size))
377. self._share_hashes = []
378. for i in range(0, sharehashtree_size, 2+HASH_SIZE):
379. hashnum = struct.unpack(">H", data[i:i+2])[0]
380. hashvalue = data[i+2:i+2+HASH_SIZE]
381. self._share_hashes.append( (hashnum, hashvalue) )
382.
383. i = self._offsets['uri_extension']-self._offsets['share_hashes']
384. if len(data) < i+self._fieldsize:
385. raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),))
386. length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0]
387. self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length]
388.
389. def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
390. offset = self._offsets['data'] + blocknum * blocksize
391. return self._read(offset, thisblocksize)
392.
393. def get_block_data(self, blocknum, blocksize, thisblocksize):
394. d = self._start_if_needed()
395. d.addCallback(self._get_block_data, blocknum, blocksize, thisblocksize)
396. return d
397.
398. def _str2l(self, s):
399. """ split string (pulled from storage) into a list of blockids """
400. return [ s[i:i+HASH_SIZE]
401. for i in range(0, len(s), HASH_SIZE) ]
402.
403. def _get_crypttext_hashes(self, unused=None):
404. offset = self._offsets['crypttext_hash_tree']
405. size = self._offsets['block_hashes'] - offset
406. d = self._read(offset, size)
407. d.addCallback(self._str2l)
408. return d
409.
410. def get_crypttext_hashes(self):
411. d = self._start_if_needed()
412. d.addCallback(self._get_crypttext_hashes)
413. return d
414.
415. def _get_block_hashes(self, unused=None, at_least_these=()):
416. # TODO: fetch only at_least_these instead of all of them.
417. offset = self._offsets['block_hashes']
418. size = self._offsets['share_hashes'] - offset
419. d = self._read(offset, size)
420. d.addCallback(self._str2l)
421. return d
422.
423. def get_block_hashes(self, at_least_these=()):
424. if at_least_these:
425. d = self._start_if_needed()
426. d.addCallback(self._get_block_hashes, at_least_these)
427. return d
428. else:
429. return defer.succeed([])
430.
431. def _get_share_hashes(self, unused=None):
432. if hasattr(self, '_share_hashes'):
433. return self._share_hashes
434. else:
435. return self._get_share_hashes_the_old_way()
436. return self._share_hashes
437.
438. def get_share_hashes(self):
439. d = self._start_if_needed()
440. d.addCallback(self._get_share_hashes)
441. return d
442.
443. def _get_share_hashes_the_old_way(self):
444. """ Tahoe storage servers < v1.3.0 would return an error if you tried
445. to read past the end of the share, so we need to use the offset and
446. read just that much."""
447. offset = self._offsets['share_hashes']
448. size = self._offsets['uri_extension'] - offset
449. if size % (2+HASH_SIZE) != 0:
450. raise LayoutInvalid("share hash tree corrupted -- should occupy a multiple of %d bytes, not %d bytes" % ((2+HASH_SIZE), size))
451. d = self._read(offset, size)
452. def _unpack_share_hashes(data):
453. if len(data) != size:
454. raise LayoutInvalid("share hash tree corrupted -- got a short read of the share data -- should have gotten %d, not %d bytes" % (size, len(data)))
455. hashes = []
456. for i in range(0, size, 2+HASH_SIZE):
457. hashnum = struct.unpack(">H", data[i:i+2])[0]
458. hashvalue = data[i+2:i+2+HASH_SIZE]
459. hashes.append( (hashnum, hashvalue) )
460. return hashes
461. d.addCallback(_unpack_share_hashes)
462. return d
463.
464. def _get_uri_extension_the_old_way(self, unused=None):
465. """ Tahoe storage servers < v1.3.0 would return an error if you tried
466. to read past the end of the share, so we need to fetch the UEB size
467. and then read just that much."""
468. offset = self._offsets['uri_extension']
469. d = self._read(offset, self._fieldsize)
470. def _got_length(data):
471. if len(data) != self._fieldsize:
472. raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
473. length = struct.unpack(self._fieldstruct, data)[0]
474. if length >= 2**31:
475. # URI extension blocks are around 419 bytes long, so this
476. # must be corrupted. Anyway, the foolscap interface schema
477. # for "read" will not allow >= 2**31 bytes length.
478. raise RidiculouslyLargeURIExtensionBlock(length)
479.
480. return self._read(offset+self._fieldsize, length)
481. d.addCallback(_got_length)
482. return d
483.
484. def _get_uri_extension(self, unused=None):
485. if hasattr(self, '_ueb_data'):
486. return self._ueb_data
487. else:
488. return self._get_uri_extension_the_old_way()
489.
490. def get_uri_extension(self):
491. d = self._start_if_needed()
492. d.addCallback(self._get_uri_extension)
493. return d
494.
495. def _read(self, offset, length):
496. return self._rref.callRemote("read", offset, length)