source: trunk/src/allmydata/mutable/publish.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 55.6 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import os, time
6from io import BytesIO
7from itertools import count
8from zope.interface import implementer
9from twisted.internet import defer
10from twisted.python import failure
11
12from allmydata.crypto import aes
13from allmydata.crypto import rsa
14from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
15                                 IMutableUploadable
16from allmydata.util import base32, hashutil, mathutil, log
17from allmydata.util.dictutil import DictOfSets
18from allmydata.util.deferredutil import async_to_deferred
19from allmydata.util.cputhreadpool import defer_to_thread
20from allmydata import hashtree, codec
21from allmydata.storage.server import si_b2a
22from foolscap.api import eventually, fireEventually
23from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \
24     UncoordinatedWriteError, NotEnoughServersError
25from allmydata.mutable.servermap import ServerMap
26from allmydata.mutable.layout import get_version_from_checkstring,\
27                                     unpack_mdmf_checkstring, \
28                                     unpack_sdmf_checkstring, \
29                                     MDMFSlotWriteProxy, \
30                                     SDMFSlotWriteProxy
31
32from eliot import (
33    Message,
34    start_action,
35)
36
37KiB = 1024
38DEFAULT_MUTABLE_MAX_SEGMENT_SIZE = 128 * KiB
39PUSHING_BLOCKS_STATE = 0
40PUSHING_EVERYTHING_ELSE_STATE = 1
41DONE_STATE = 2
42
43@implementer(IPublishStatus)
44class PublishStatus(object):
45    statusid_counter = count(0)
46    def __init__(self):
47        self.timings = {}
48        self.timings["send_per_server"] = {}
49        self.timings["encrypt"] = 0.0
50        self.timings["encode"] = 0.0
51        self.servermap = None
52        self._problems = {}
53        self.active = True
54        self.storage_index = None
55        self.helper = False
56        self.encoding = ("?", "?")
57        self.size = None
58        self.status = "Not started"
59        self.progress = 0.0
60        self.counter = next(self.statusid_counter)
61        self.started = time.time()
62
63    def add_per_server_time(self, server, elapsed):
64        if server not in self.timings["send_per_server"]:
65            self.timings["send_per_server"][server] = []
66        self.timings["send_per_server"][server].append(elapsed)
67    def accumulate_encode_time(self, elapsed):
68        self.timings["encode"] += elapsed
69    def accumulate_encrypt_time(self, elapsed):
70        self.timings["encrypt"] += elapsed
71
72    def get_started(self):
73        return self.started
74    def get_storage_index(self):
75        return self.storage_index
76    def get_encoding(self):
77        return self.encoding
78    def using_helper(self):
79        return self.helper
80    def get_servermap(self):
81        return self.servermap
82    def get_size(self):
83        return self.size
84    def get_status(self):
85        return self.status
86    def get_progress(self):
87        return self.progress
88    def get_active(self):
89        return self.active
90    def get_counter(self):
91        return self.counter
92    def get_problems(self):
93        return self._problems
94
95    def set_storage_index(self, si):
96        self.storage_index = si
97    def set_helper(self, helper):
98        self.helper = helper
99    def set_servermap(self, servermap):
100        self.servermap = servermap
101    def set_encoding(self, k, n):
102        self.encoding = (k, n)
103    def set_size(self, size):
104        self.size = size
105    def set_status(self, status):
106        self.status = status
107    def set_progress(self, value):
108        self.progress = value
109    def set_active(self, value):
110        self.active = value
111
112class LoopLimitExceededError(Exception):
113    pass
114
115class Publish(object):
116    """I represent a single act of publishing the mutable file to the grid. I
117    will only publish my data if the servermap I am using still represents
118    the current state of the world.
119
120    To make the initial publish, set servermap to None.
121    """
122
123    def __init__(self, filenode, storage_broker, servermap):
124        self._node = filenode
125        self._storage_broker = storage_broker
126        self._servermap = servermap
127        self._storage_index = self._node.get_storage_index()
128        self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
129        num = self.log("Publish(%r): starting" % prefix, parent=None)
130        self._log_number = num
131        self._running = True
132        self._first_write_error = None
133        self._last_failure = None
134
135        self._status = PublishStatus()
136        self._status.set_storage_index(self._storage_index)
137        self._status.set_helper(False)
138        self._status.set_progress(0.0)
139        self._status.set_active(True)
140        self._version = self._node.get_version()
141        assert self._version in (SDMF_VERSION, MDMF_VERSION)
142
143
144    def get_status(self):
145        return self._status
146
147    def log(self, *args, **kwargs):
148        if 'parent' not in kwargs:
149            kwargs['parent'] = self._log_number
150        if "facility" not in kwargs:
151            kwargs["facility"] = "tahoe.mutable.publish"
152        return log.msg(*args, **kwargs)
153
154
155    def update(self, data, offset, blockhashes, version):
156        """
157        I replace the contents of this file with the contents of data,
158        starting at offset. I return a Deferred that fires with None
159        when the replacement has been completed, or with an error if
160        something went wrong during the process.
161
162        Note that this process will not upload new shares. If the file
163        being updated is in need of repair, callers will have to repair
164        it on their own.
165        """
166        # How this works:
167        # 1: Make server assignments. We'll assign each share that we know
168        # about on the grid to that server that currently holds that
169        # share, and will not place any new shares.
170        # 2: Setup encoding parameters. Most of these will stay the same
171        # -- datalength will change, as will some of the offsets.
172        # 3. Upload the new segments.
173        # 4. Be done.
174        assert IMutableUploadable.providedBy(data)
175
176        self.data = data
177
178        # XXX: Use the MutableFileVersion instead.
179        self.datalength = self._node.get_size()
180        if data.get_size() > self.datalength:
181            self.datalength = data.get_size()
182
183        self.log("starting update")
184        self.log("adding new data of length %d at offset %d" % \
185                    (data.get_size(), offset))
186        self.log("new data length is %d" % self.datalength)
187        self._status.set_size(self.datalength)
188        self._status.set_status("Started")
189        self._started = time.time()
190
191        self.done_deferred = defer.Deferred()
192
193        self._writekey = self._node.get_writekey()
194        assert self._writekey, "need write capability to publish"
195
196        # first, which servers will we publish to? We require that the
197        # servermap was updated in MODE_WRITE, so we can depend upon the
198        # serverlist computed by that process instead of computing our own.
199        assert self._servermap
200        assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
201        # we will push a version that is one larger than anything present
202        # in the grid, according to the servermap.
203        self._new_seqnum = self._servermap.highest_seqnum() + 1
204        self._status.set_servermap(self._servermap)
205
206        self.log(format="new seqnum will be %(seqnum)d",
207                 seqnum=self._new_seqnum, level=log.NOISY)
208
209        # We're updating an existing file, so all of the following
210        # should be available.
211        self.readkey = self._node.get_readkey()
212        self.required_shares = self._node.get_required_shares()
213        assert self.required_shares is not None
214        self.total_shares = self._node.get_total_shares()
215        assert self.total_shares is not None
216        self._status.set_encoding(self.required_shares, self.total_shares)
217
218        self._pubkey = self._node.get_pubkey()
219        assert self._pubkey
220        self._privkey = self._node.get_privkey()
221        assert self._privkey
222        self._encprivkey = self._node.get_encprivkey()
223
224        sb = self._storage_broker
225        full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
226        self.full_serverlist = full_serverlist # for use later, immutable
227        self.bad_servers = set() # servers who have errbacked/refused requests
228
229        # This will set self.segment_size, self.num_segments, and
230        # self.fec. TODO: Does it know how to do the offset? Probably
231        # not. So do that part next.
232        self.setup_encoding_parameters(offset=offset)
233
234        # if we experience any surprises (writes which were rejected because
235        # our test vector did not match, or shares which we didn't expect to
236        # see), we set this flag and report an UncoordinatedWriteError at the
237        # end of the publish process.
238        self.surprised = False
239
240        # we keep track of three tables. The first is our goal: which share
241        # we want to see on which servers. This is initially populated by the
242        # existing servermap.
243        self.goal = set() # pairs of (server, shnum) tuples
244
245        # the number of outstanding queries: those that are in flight and
246        # may or may not be delivered, accepted, or acknowledged. This is
247        # incremented when a query is sent, and decremented when the response
248        # returns or errbacks.
249        self.num_outstanding = 0
250
251        # the third is a table of successes: share which have actually been
252        # placed. These are populated when responses come back with success.
253        # When self.placed == self.goal, we're done.
254        self.placed = set() # (server, shnum) tuples
255
256        self.bad_share_checkstrings = {}
257
258        # This is set at the last step of the publishing process.
259        self.versioninfo = ""
260
261        # we use the servermap to populate the initial goal: this way we will
262        # try to update each existing share in place. Since we're
263        # updating, we ignore damaged and missing shares -- callers must
264        # do a repair to repair and recreate these.
265        self.goal = set(self._servermap.get_known_shares())
266
267        # shnum -> set of IMutableSlotWriter
268        self.writers = DictOfSets()
269
270        # SDMF files are updated differently.
271        self._version = MDMF_VERSION
272        writer_class = MDMFSlotWriteProxy
273
274        # For each (server, shnum) in self.goal, we make a
275        # write proxy for that server. We'll use this to write
276        # shares to the server.
277        for (server,shnum) in self.goal:
278            write_enabler = self._node.get_write_enabler(server)
279            renew_secret = self._node.get_renewal_secret(server)
280            cancel_secret = self._node.get_cancel_secret(server)
281            secrets = (write_enabler, renew_secret, cancel_secret)
282
283            writer = writer_class(shnum,
284                                  server.get_storage_server(),
285                                  self._storage_index,
286                                  secrets,
287                                  self._new_seqnum,
288                                  self.required_shares,
289                                  self.total_shares,
290                                  self.segment_size,
291                                  self.datalength)
292
293            self.writers.add(shnum, writer)
294            writer.server = server
295            known_shares = self._servermap.get_known_shares()
296            assert (server, shnum) in known_shares
297            old_versionid, old_timestamp = known_shares[(server,shnum)]
298            (old_seqnum, old_root_hash, old_salt, old_segsize,
299             old_datalength, old_k, old_N, old_prefix,
300             old_offsets_tuple) = old_versionid
301            writer.set_checkstring(old_seqnum,
302                                   old_root_hash,
303                                   old_salt)
304
305        # Our remote shares will not have a complete checkstring until
306        # after we are done writing share data and have started to write
307        # blocks. In the meantime, we need to know what to look for when
308        # writing, so that we can detect UncoordinatedWriteErrors.
309        self._checkstring = self._get_some_writer().get_checkstring()
310
311        # Now, we start pushing shares.
312        self._status.timings["setup"] = time.time() - self._started
313        # First, we encrypt, encode, and publish the shares that we need
314        # to encrypt, encode, and publish.
315
316        # Our update process fetched these for us. We need to update
317        # them in place as publishing happens.
318        self.blockhashes = {} # (shnum, [blochashes])
319        for (i, bht) in list(blockhashes.items()):
320            # We need to extract the leaves from our old hash tree.
321            old_segcount = mathutil.div_ceil(version[4],
322                                             version[3])
323            h = hashtree.IncompleteHashTree(old_segcount)
324            bht = dict(enumerate(bht))
325            h.set_hashes(bht)
326            leaves = h[h.get_leaf_index(0):]
327            for j in range(self.num_segments - len(leaves)):
328                leaves.append(None)
329
330            assert len(leaves) >= self.num_segments
331            self.blockhashes[i] = leaves
332            # This list will now be the leaves that were set during the
333            # initial upload + enough empty hashes to make it a
334            # power-of-two. If we exceed a power of two boundary, we
335            # should be encoding the file over again, and should not be
336            # here. So, we have
337            #assert len(self.blockhashes[i]) == \
338            #    hashtree.roundup_pow2(self.num_segments), \
339            #        len(self.blockhashes[i])
340            # XXX: Except this doesn't work. Figure out why.
341
342        # These are filled in later, after we've modified the block hash
343        # tree suitably.
344        self.sharehash_leaves = None # eventually [sharehashes]
345        self.sharehashes = {} # shnum -> [sharehash leaves necessary to
346                              # validate the share]
347
348        self.log("Starting push")
349
350        self._state = PUSHING_BLOCKS_STATE
351        self._push()
352
353        return self.done_deferred
354
355
356    def publish(self, newdata):
357        """Publish the filenode's current contents.  Returns a Deferred that
358        fires (with None) when the publish has done as much work as it's ever
359        going to do, or errbacks with ConsistencyError if it detects a
360        simultaneous write.
361        """
362
363        # 0. Setup encoding parameters, encoder, and other such things.
364        # 1. Encrypt, encode, and publish segments.
365        assert IMutableUploadable.providedBy(newdata)
366
367        self.data = newdata
368        self.datalength = newdata.get_size()
369        #if self.datalength >= DEFAULT_MUTABLE_MAX_SEGMENT_SIZE:
370        #    self._version = MDMF_VERSION
371        #else:
372        #    self._version = SDMF_VERSION
373
374        self.log("starting publish, datalen is %s" % self.datalength)
375        self._status.set_size(self.datalength)
376        self._status.set_status("Started")
377        self._started = time.time()
378
379        self.done_deferred = defer.Deferred()
380
381        self._writekey = self._node.get_writekey()
382        assert self._writekey, "need write capability to publish"
383
384        # first, which servers will we publish to? We require that the
385        # servermap was updated in MODE_WRITE, so we can depend upon the
386        # serverlist computed by that process instead of computing our own.
387        if self._servermap:
388            assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
389            # we will push a version that is one larger than anything present
390            # in the grid, according to the servermap.
391            self._new_seqnum = self._servermap.highest_seqnum() + 1
392        else:
393            # If we don't have a servermap, that's because we're doing the
394            # initial publish
395            self._new_seqnum = 1
396            self._servermap = ServerMap()
397        self._status.set_servermap(self._servermap)
398
399        self.log(format="new seqnum will be %(seqnum)d",
400                 seqnum=self._new_seqnum, level=log.NOISY)
401
402        # having an up-to-date servermap (or using a filenode that was just
403        # created for the first time) also guarantees that the following
404        # fields are available
405        self.readkey = self._node.get_readkey()
406        self.required_shares = self._node.get_required_shares()
407        assert self.required_shares is not None
408        self.total_shares = self._node.get_total_shares()
409        assert self.total_shares is not None
410        self._status.set_encoding(self.required_shares, self.total_shares)
411
412        self._pubkey = self._node.get_pubkey()
413        assert self._pubkey
414        self._privkey = self._node.get_privkey()
415        assert self._privkey
416        self._encprivkey = self._node.get_encprivkey()
417
418        sb = self._storage_broker
419        full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
420        self.full_serverlist = full_serverlist # for use later, immutable
421        self.bad_servers = set() # servers who have errbacked/refused requests
422
423        # This will set self.segment_size, self.num_segments, and
424        # self.fec.
425        self.setup_encoding_parameters()
426
427        # if we experience any surprises (writes which were rejected because
428        # our test vector did not match, or shares which we didn't expect to
429        # see), we set this flag and report an UncoordinatedWriteError at the
430        # end of the publish process.
431        self.surprised = False
432
433        # we keep track of three tables. The first is our goal: which share
434        # we want to see on which servers. This is initially populated by the
435        # existing servermap.
436        self.goal = set() # pairs of (server, shnum) tuples
437
438        # the number of outstanding queries: those that are in flight and
439        # may or may not be delivered, accepted, or acknowledged. This is
440        # incremented when a query is sent, and decremented when the response
441        # returns or errbacks.
442        self.num_outstanding = 0
443
444        # the third is a table of successes: share which have actually been
445        # placed. These are populated when responses come back with success.
446        # When self.placed == self.goal, we're done.
447        self.placed = set() # (server, shnum) tuples
448
449        self.bad_share_checkstrings = {}
450
451        # This is set at the last step of the publishing process.
452        self.versioninfo = ""
453
454        # we use the servermap to populate the initial goal: this way we will
455        # try to update each existing share in place.
456        self.goal = set(self._servermap.get_known_shares())
457
458        # then we add in all the shares that were bad (corrupted, bad
459        # signatures, etc). We want to replace these.
460        for key, old_checkstring in list(self._servermap.get_bad_shares().items()):
461            (server, shnum) = key
462            self.goal.add( (server,shnum) )
463            self.bad_share_checkstrings[(server,shnum)] = old_checkstring
464
465        # TODO: Make this part do server selection.
466        self.update_goal()
467
468        # shnum -> set of IMutableSlotWriter
469        self.writers = DictOfSets()
470
471        if self._version == MDMF_VERSION:
472            writer_class = MDMFSlotWriteProxy
473        else:
474            writer_class = SDMFSlotWriteProxy
475
476        # For each (server, shnum) in self.goal, we make a
477        # write proxy for that server. We'll use this to write
478        # shares to the server.
479        for (server,shnum) in self.goal:
480            write_enabler = self._node.get_write_enabler(server)
481            renew_secret = self._node.get_renewal_secret(server)
482            cancel_secret = self._node.get_cancel_secret(server)
483            secrets = (write_enabler, renew_secret, cancel_secret)
484
485            writer =  writer_class(shnum,
486                                   server.get_storage_server(),
487                                   self._storage_index,
488                                   secrets,
489                                   self._new_seqnum,
490                                   self.required_shares,
491                                   self.total_shares,
492                                   self.segment_size,
493                                   self.datalength)
494            self.writers.add(shnum, writer)
495            writer.server = server
496            known_shares = self._servermap.get_known_shares()
497            if (server, shnum) in known_shares:
498                old_versionid, old_timestamp = known_shares[(server,shnum)]
499                (old_seqnum, old_root_hash, old_salt, old_segsize,
500                 old_datalength, old_k, old_N, old_prefix,
501                 old_offsets_tuple) = old_versionid
502                writer.set_checkstring(old_seqnum,
503                                       old_root_hash,
504                                       old_salt)
505            elif (server, shnum) in self.bad_share_checkstrings:
506                old_checkstring = self.bad_share_checkstrings[(server, shnum)]
507                writer.set_checkstring(old_checkstring)
508
509        # Our remote shares will not have a complete checkstring until
510        # after we are done writing share data and have started to write
511        # blocks. In the meantime, we need to know what to look for when
512        # writing, so that we can detect UncoordinatedWriteErrors.
513        self._checkstring = self._get_some_writer().get_checkstring()
514
515        # Now, we start pushing shares.
516        self._status.timings["setup"] = time.time() - self._started
517        # First, we encrypt, encode, and publish the shares that we need
518        # to encrypt, encode, and publish.
519
520        # This will eventually hold the block hash chain for each share
521        # that we publish. We define it this way so that empty publishes
522        # will still have something to write to the remote slot.
523        self.blockhashes = dict([(i, []) for i in range(self.total_shares)])
524        for i in range(self.total_shares):
525            blocks = self.blockhashes[i]
526            for j in range(self.num_segments):
527                blocks.append(None)
528        self.sharehash_leaves = None # eventually [sharehashes]
529        self.sharehashes = {} # shnum -> [sharehash leaves necessary to
530                              # validate the share]
531
532        self.log("Starting push")
533
534        self._state = PUSHING_BLOCKS_STATE
535        self._push()
536
537        return self.done_deferred
538
539    def _get_some_writer(self):
540        return list(list(self.writers.values())[0])[0]
541
542    def _update_status(self):
543        self._status.set_status("Sending Shares: %d placed out of %d, "
544                                "%d messages outstanding" %
545                                (len(self.placed),
546                                 len(self.goal),
547                                 self.num_outstanding))
548        self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
549
550
551    def setup_encoding_parameters(self, offset=0):
552        if self._version == MDMF_VERSION:
553            segment_size = DEFAULT_MUTABLE_MAX_SEGMENT_SIZE # 128 KiB by default
554        else:
555            segment_size = self.datalength # SDMF is only one segment
556        # this must be a multiple of self.required_shares
557        segment_size = mathutil.next_multiple(segment_size,
558                                              self.required_shares)
559        self.segment_size = segment_size
560
561        # Calculate the starting segment for the upload.
562        if segment_size:
563            # We use div_ceil instead of integer division here because
564            # it is semantically correct.
565            # If datalength isn't an even multiple of segment_size, but
566            # is larger than segment_size, datalength // segment_size
567            # will be the largest number such that num <= datalength and
568            # num % segment_size == 0. But that's not what we want,
569            # because it ignores the extra data. div_ceil will give us
570            # the right number of segments for the data that we're
571            # given.
572            self.num_segments = mathutil.div_ceil(self.datalength,
573                                                  segment_size)
574
575            self.starting_segment = offset // segment_size
576
577        else:
578            self.num_segments = 0
579            self.starting_segment = 0
580
581
582        self.log("building encoding parameters for file")
583        self.log("got segsize %d" % self.segment_size)
584        self.log("got %d segments" % self.num_segments)
585
586        if self._version == SDMF_VERSION:
587            assert self.num_segments in (0, 1) # SDMF
588        # calculate the tail segment size.
589
590        if segment_size and self.datalength:
591            self.tail_segment_size = self.datalength % segment_size
592            self.log("got tail segment size %d" % self.tail_segment_size)
593        else:
594            self.tail_segment_size = 0
595
596        if self.tail_segment_size == 0 and segment_size:
597            # The tail segment is the same size as the other segments.
598            self.tail_segment_size = segment_size
599
600        # Make FEC encoders
601        fec = codec.CRSEncoder()
602        fec.set_params(self.segment_size,
603                       self.required_shares, self.total_shares)
604        self.piece_size = fec.get_block_size()
605        self.fec = fec
606
607        if self.tail_segment_size == self.segment_size:
608            self.tail_fec = self.fec
609        else:
610            tail_fec = codec.CRSEncoder()
611            tail_fec.set_params(self.tail_segment_size,
612                                self.required_shares,
613                                self.total_shares)
614            self.tail_fec = tail_fec
615
616        self._current_segment = self.starting_segment
617        self.end_segment = self.num_segments - 1
618        # Now figure out where the last segment should be.
619        if self.data.get_size() != self.datalength:
620            # We're updating a few segments in the middle of a mutable
621            # file, so we don't want to republish the whole thing.
622            # (we don't have enough data to do that even if we wanted
623            # to)
624            end = self.data.get_size()
625            self.end_segment = end // segment_size
626            if end % segment_size == 0:
627                self.end_segment -= 1
628
629        self.log("got start segment %d" % self.starting_segment)
630        self.log("got end segment %d" % self.end_segment)
631
632
633    def _push(self, ignored=None):
634        """
635        I manage state transitions. In particular, I see that we still
636        have a good enough number of writers to complete the upload
637        successfully.
638        """
639        # Can we still successfully publish this file?
640        # TODO: Keep track of outstanding queries before aborting the
641        #       process.
642        num_shnums = len(self.writers)
643        if num_shnums < self.required_shares or self.surprised:
644            return self._failure()
645
646        # Figure out what we need to do next. Each of these needs to
647        # return a deferred so that we don't block execution when this
648        # is first called in the upload method.
649        if self._state == PUSHING_BLOCKS_STATE:
650            return self.push_segment(self._current_segment)
651
652        elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
653            return self.push_everything_else()
654
655        # If we make it to this point, we were successful in placing the
656        # file.
657        return self._done()
658
659
660    def push_segment(self, segnum):
661        if self.num_segments == 0 and self._version == SDMF_VERSION:
662            self._add_dummy_salts()
663
664        if segnum > self.end_segment:
665            # We don't have any more segments to push.
666            self._state = PUSHING_EVERYTHING_ELSE_STATE
667            return self._push()
668
669        d = self._encode_segment(segnum)
670        d.addCallback(self._push_segment, segnum)
671        def _increment_segnum(ign):
672            self._current_segment += 1
673        # XXX: I don't think we need to do addBoth here -- any errBacks
674        # should be handled within push_segment.
675        d.addCallback(_increment_segnum)
676        d.addCallback(self._turn_barrier)
677        d.addCallback(self._push)
678        d.addErrback(self._failure)
679
680
681    def _turn_barrier(self, result):
682        """
683        I help the publish process avoid the recursion limit issues
684        described in #237.
685        """
686        return fireEventually(result)
687
688
689    def _add_dummy_salts(self):
690        """
691        SDMF files need a salt even if they're empty, or the signature
692        won't make sense. This method adds a dummy salt to each of our
693        SDMF writers so that they can write the signature later.
694        """
695        salt = os.urandom(16)
696        assert self._version == SDMF_VERSION
697
698        for shnum, writers in self.writers.items():
699            for writer in writers:
700                writer.put_salt(salt)
701
702
703    @async_to_deferred
704    async def _encode_segment(self, segnum):
705        """
706        I encrypt and encode the segment segnum.
707        """
708        started = time.time()
709
710        if segnum + 1 == self.num_segments:
711            segsize = self.tail_segment_size
712        else:
713            segsize = self.segment_size
714
715
716        self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
717        data = self.data.read(segsize)
718        if not isinstance(data, bytes):
719            # XXX: Why does this return a list?
720            data = b"".join(data)
721
722        assert len(data) == segsize, len(data)
723
724        self._status.set_status("Encrypting")
725
726        def encrypt(readkey):
727            salt = os.urandom(16)
728            key = hashutil.ssk_readkey_data_hash(salt, readkey)
729            encryptor = aes.create_encryptor(key)
730            crypttext = aes.encrypt_data(encryptor, data)
731            assert len(crypttext) == len(data)
732            return salt, crypttext
733
734        salt, crypttext = await defer_to_thread(encrypt, self.readkey)
735
736        now = time.time()
737        self._status.accumulate_encrypt_time(now - started)
738        started = now
739
740        # now apply FEC
741        if segnum + 1 == self.num_segments:
742            fec = self.tail_fec
743        else:
744            fec = self.fec
745
746        self._status.set_status("Encoding")
747        crypttext_pieces = [None] * self.required_shares
748        piece_size = fec.get_block_size()
749        for i in range(len(crypttext_pieces)):
750            offset = i * piece_size
751            piece = crypttext[offset:offset+piece_size]
752            piece = piece + b"\x00"*(piece_size - len(piece)) # padding
753            crypttext_pieces[i] = piece
754            assert len(piece) == piece_size
755
756        res = await fec.encode(crypttext_pieces)
757        elapsed = time.time() - started
758        self._status.accumulate_encode_time(elapsed)
759        return (res, salt)
760
761    @async_to_deferred
762    async def _push_segment(self, encoded_and_salt, segnum):
763        """
764        I push (data, salt) as segment number segnum.
765        """
766        results, salt = encoded_and_salt
767        shares, shareids = results
768        self._status.set_status("Pushing segment")
769        for i in range(len(shares)):
770            sharedata = shares[i]
771            shareid = shareids[i]
772            if self._version == MDMF_VERSION:
773                hashed = salt + sharedata
774            else:
775                hashed = sharedata
776            block_hash = await defer_to_thread(hashutil.block_hash, hashed)
777            self.blockhashes[shareid][segnum] = block_hash
778            # find the writer for this share
779            writers = self.writers[shareid]
780            for writer in writers:
781                writer.put_block(sharedata, segnum, salt)
782
783
784    def push_everything_else(self):
785        """
786        I put everything else associated with a share.
787        """
788        self._pack_started = time.time()
789        self.push_encprivkey()
790        self.push_blockhashes()
791        self.push_sharehashes()
792        self.push_toplevel_hashes_and_signature()
793        d = self.finish_publishing()
794        def _change_state(ignored):
795            self._state = DONE_STATE
796        d.addCallback(_change_state)
797        d.addCallback(self._push)
798        return d
799
800
801    def push_encprivkey(self):
802        encprivkey = self._encprivkey
803        self._status.set_status("Pushing encrypted private key")
804        for shnum, writers in self.writers.items():
805            for writer in writers:
806                writer.put_encprivkey(encprivkey)
807
808
809    def push_blockhashes(self):
810        self.sharehash_leaves = [None] * len(self.blockhashes)
811        self._status.set_status("Building and pushing block hash tree")
812        for shnum, blockhashes in list(self.blockhashes.items()):
813            t = hashtree.HashTree(blockhashes)
814            self.blockhashes[shnum] = list(t)
815            # set the leaf for future use.
816            self.sharehash_leaves[shnum] = t[0]
817
818            writers = self.writers[shnum]
819            for writer in writers:
820                writer.put_blockhashes(self.blockhashes[shnum])
821
822
823    def push_sharehashes(self):
824        self._status.set_status("Building and pushing share hash chain")
825        share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
826        for shnum in range(len(self.sharehash_leaves)):
827            needed_indices = share_hash_tree.needed_hashes(shnum)
828            self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
829                                             for i in needed_indices] )
830            writers = self.writers[shnum]
831            for writer in writers:
832                writer.put_sharehashes(self.sharehashes[shnum])
833        self.root_hash = share_hash_tree[0]
834
835
836    def push_toplevel_hashes_and_signature(self):
837        # We need to to three things here:
838        #   - Push the root hash and salt hash
839        #   - Get the checkstring of the resulting layout; sign that.
840        #   - Push the signature
841        self._status.set_status("Pushing root hashes and signature")
842        for shnum in range(self.total_shares):
843            writers = self.writers[shnum]
844            for writer in writers:
845                writer.put_root_hash(self.root_hash)
846        self._update_checkstring()
847        self._make_and_place_signature()
848
849
850    def _update_checkstring(self):
851        """
852        After putting the root hash, MDMF files will have the
853        checkstring written to the storage server. This means that we
854        can update our copy of the checkstring so we can detect
855        uncoordinated writes. SDMF files will have the same checkstring,
856        so we need not do anything.
857        """
858        self._checkstring = self._get_some_writer().get_checkstring()
859
860
861    def _make_and_place_signature(self):
862        """
863        I create and place the signature.
864        """
865        started = time.time()
866        self._status.set_status("Signing prefix")
867        signable = self._get_some_writer().get_signable()
868        self.signature = rsa.sign_data(self._privkey, signable)
869
870        for (shnum, writers) in self.writers.items():
871            for writer in writers:
872                writer.put_signature(self.signature)
873        self._status.timings['sign'] = time.time() - started
874
875
876    def finish_publishing(self):
877        # We're almost done -- we just need to put the verification key
878        # and the offsets
879        started = time.time()
880        self._status.set_status("Pushing shares")
881        self._started_pushing = started
882        ds = []
883        verification_key = rsa.der_string_from_verifying_key(self._pubkey)
884
885        for (shnum, writers) in list(self.writers.copy().items()):
886            for writer in writers:
887                writer.put_verification_key(verification_key)
888                self.num_outstanding += 1
889                def _no_longer_outstanding(res):
890                    self.num_outstanding -= 1
891                    return res
892
893                d = writer.finish_publishing()
894                d.addBoth(_no_longer_outstanding)
895                d.addErrback(self._connection_problem, writer)
896                d.addCallback(self._got_write_answer, writer, started)
897                ds.append(d)
898        self._record_verinfo()
899        self._status.timings['pack'] = time.time() - started
900        return defer.DeferredList(ds)
901
902
903    def _record_verinfo(self):
904        self.versioninfo = self._get_some_writer().get_verinfo()
905
906
907    def _connection_problem(self, f, writer):
908        """
909        We ran into a connection problem while working with writer, and
910        need to deal with that.
911        """
912        self.log("found problem: %s" % str(f))
913        self._last_failure = f
914        self.writers.discard(writer.shnum, writer)
915
916
917    def log_goal(self, goal, message=""):
918        logmsg = [message]
919        for (shnum, server) in sorted([(s,p) for (p,s) in goal], key=lambda t: (id(t[0]), id(t[1]))):
920            logmsg.append("sh%d to [%r]" % (shnum, server.get_name()))
921        self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
922        self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
923                 level=log.NOISY)
924
925    def update_goal(self):
926        self.log_goal(self.goal, "before update: ")
927
928        # first, remove any bad servers from our goal
929        self.goal = set([ (server, shnum)
930                          for (server, shnum) in self.goal
931                          if server not in self.bad_servers ])
932
933        # find the homeless shares:
934        homefull_shares = set([shnum for (server, shnum) in self.goal])
935        homeless_shares = set(range(self.total_shares)) - homefull_shares
936        homeless_shares = sorted(list(homeless_shares))
937        # place them somewhere. We prefer unused servers at the beginning of
938        # the available server list.
939
940        if not homeless_shares:
941            return
942
943        # if an old share X is on a node, put the new share X there too.
944        # TODO: 1: redistribute shares to achieve one-per-server, by copying
945        #       shares from existing servers to new (less-crowded) ones. The
946        #       old shares must still be updated.
947        # TODO: 2: move those shares instead of copying them, to reduce future
948        #       update work
949
950        # this is a bit CPU intensive but easy to analyze. We create a sort
951        # order for each server. If the server is marked as bad, we don't
952        # even put them in the list. Then we care about the number of shares
953        # which have already been assigned to them. After that we care about
954        # their permutation order.
955        old_assignments = DictOfSets()
956        for (server, shnum) in self.goal:
957            old_assignments.add(server, shnum)
958
959        serverlist = []
960
961        action = start_action(
962            action_type=u"mutable:upload:update_goal",
963            homeless_shares=len(homeless_shares),
964        )
965        with action:
966            for i, server in enumerate(self.full_serverlist):
967                serverid = server.get_serverid()
968                if server in self.bad_servers:
969                    Message.log(
970                        message_type=u"mutable:upload:bad-server",
971                        server_id=serverid,
972                    )
973                    continue
974                # if we have >= 1 grid-managers, this checks that we have
975                # a valid certificate for this server
976                if not server.upload_permitted():
977                    Message.log(
978                        message_type=u"mutable:upload:no-gm-certs",
979                        server_id=serverid,
980                    )
981                    continue
982
983                entry = (len(old_assignments.get(server, [])), i, serverid, server)
984                serverlist.append(entry)
985        serverlist.sort()
986
987        if not serverlist:
988            raise NotEnoughServersError("Ran out of non-bad servers, "
989                                        "first_error=%s" %
990                                        str(self._first_write_error),
991                                        self._first_write_error)
992
993        # we then index this serverlist with an integer, because we may have
994        # to wrap. We update the goal as we go.
995        i = 0
996        for shnum in homeless_shares:
997            (ignored1, ignored2, ignored3, server) = serverlist[i]
998            # if we are forced to send a share to a server that already has
999            # one, we may have two write requests in flight, and the
1000            # servermap (which was computed before either request was sent)
1001            # won't reflect the new shares, so the second response will be
1002            # surprising. There is code in _got_write_answer() to tolerate
1003            # this, otherwise it would cause the publish to fail with an
1004            # UncoordinatedWriteError. See #546 for details of the trouble
1005            # this used to cause.
1006            self.goal.add( (server, shnum) )
1007            i += 1
1008            if i >= len(serverlist):
1009                i = 0
1010        self.log_goal(self.goal, "after update: ")
1011
1012
1013    def _got_write_answer(self, answer, writer, started):
1014        if not answer:
1015            # SDMF writers only pretend to write when readers set their
1016            # blocks, salts, and so on -- they actually just write once,
1017            # at the end of the upload process. In fake writes, they
1018            # return defer.succeed(None). If we see that, we shouldn't
1019            # bother checking it.
1020            return
1021
1022        server = writer.server
1023        lp = self.log("_got_write_answer from %r, share %d" %
1024                      (server.get_name(), writer.shnum))
1025
1026        now = time.time()
1027        elapsed = now - started
1028
1029        self._status.add_per_server_time(server, elapsed)
1030
1031        wrote, read_data = answer
1032
1033        surprise_shares = set(read_data.keys()) - set([writer.shnum])
1034
1035        # We need to remove from surprise_shares any shares that we are
1036        # knowingly also writing to that server from other writers.
1037
1038        # TODO: Precompute this.
1039        shares = []
1040        for shnum, writers in self.writers.items():
1041            shares.extend([x.shnum for x in writers if x.server == server])
1042        known_shnums = set(shares)
1043        surprise_shares -= known_shnums
1044        self.log("found the following surprise shares: %s" %
1045                 str(surprise_shares))
1046
1047        # Now surprise shares contains all of the shares that we did not
1048        # expect to be there.
1049
1050        surprised = False
1051        for shnum in surprise_shares:
1052            # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1053            checkstring = read_data[shnum][0]
1054            # What we want to do here is to see if their (seqnum,
1055            # roothash, salt) is the same as our (seqnum, roothash,
1056            # salt), or the equivalent for MDMF. The best way to do this
1057            # is to store a packed representation of our checkstring
1058            # somewhere, then not bother unpacking the other
1059            # checkstring.
1060            if checkstring == self._checkstring:
1061                # they have the right share, somehow
1062
1063                if (server,shnum) in self.goal:
1064                    # and we want them to have it, so we probably sent them a
1065                    # copy in an earlier write. This is ok, and avoids the
1066                    # #546 problem.
1067                    continue
1068
1069                # They aren't in our goal, but they are still for the right
1070                # version. Somebody else wrote them, and it's a convergent
1071                # uncoordinated write. Pretend this is ok (don't be
1072                # surprised), since I suspect there's a decent chance that
1073                # we'll hit this in normal operation.
1074                continue
1075
1076            else:
1077                # the new shares are of a different version
1078                if server in self._servermap.get_reachable_servers():
1079                    # we asked them about their shares, so we had knowledge
1080                    # of what they used to have. Any surprising shares must
1081                    # have come from someone else, so UCW.
1082                    surprised = True
1083                else:
1084                    # we didn't ask them, and now we've discovered that they
1085                    # have a share we didn't know about. This indicates that
1086                    # mapupdate should have wokred harder and asked more
1087                    # servers before concluding that it knew about them all.
1088
1089                    # signal UCW, but make sure to ask this server next time,
1090                    # so we'll remember to update it if/when we retry.
1091                    surprised = True
1092                    # TODO: ask this server next time. I don't yet have a good
1093                    # way to do this. Two insufficient possibilities are:
1094                    #
1095                    # self._servermap.add_new_share(server, shnum, verinfo, now)
1096                    #  but that requires fetching/validating/parsing the whole
1097                    #  version string, and all we have is the checkstring
1098                    # self._servermap.mark_bad_share(server, shnum, checkstring)
1099                    #  that will make publish overwrite the share next time,
1100                    #  but it won't re-query the server, and it won't make
1101                    #  mapupdate search further
1102
1103                    # TODO later: when publish starts, do
1104                    # servermap.get_best_version(), extract the seqnum,
1105                    # subtract one, and store as highest-replaceable-seqnum.
1106                    # Then, if this surprise-because-we-didn't-ask share is
1107                    # of highest-replaceable-seqnum or lower, we're allowed
1108                    # to replace it: send out a new writev (or rather add it
1109                    # to self.goal and loop).
1110
1111                surprised = True
1112
1113        if surprised:
1114            self.log("they had shares %s that we didn't know about" %
1115                     (list(surprise_shares),),
1116                     parent=lp, level=log.WEIRD, umid="un9CSQ")
1117            self.surprised = True
1118
1119        if not wrote:
1120            # TODO: there are two possibilities. The first is that the server
1121            # is full (or just doesn't want to give us any room), which means
1122            # we shouldn't ask them again, but is *not* an indication of an
1123            # uncoordinated write. The second is that our testv failed, which
1124            # *does* indicate an uncoordinated write. We currently don't have
1125            # a way to tell these two apart (in fact, the storage server code
1126            # doesn't have the option of refusing our share).
1127            #
1128            # If the server is full, mark the server as bad (so we don't ask
1129            # them again), but don't set self.surprised. The loop() will find
1130            # a new server.
1131            #
1132            # If the testv failed, log it, set self.surprised, but don't
1133            # bother adding to self.bad_servers .
1134
1135            self.log("our testv failed, so the write did not happen",
1136                     parent=lp, level=log.WEIRD, umid="8sc26g")
1137            self.surprised = True
1138            self.bad_servers.add(server) # don't ask them again
1139            # use the checkstring to add information to the log message
1140            unknown_format = False
1141            for (shnum,readv) in list(read_data.items()):
1142                checkstring = readv[0]
1143                version = get_version_from_checkstring(checkstring)
1144                if version == MDMF_VERSION:
1145                    (other_seqnum,
1146                     other_roothash) = unpack_mdmf_checkstring(checkstring)
1147                elif version == SDMF_VERSION:
1148                    (other_seqnum,
1149                     other_roothash,
1150                     other_IV) = unpack_sdmf_checkstring(checkstring)
1151                else:
1152                    unknown_format = True
1153                expected_version = self._servermap.version_on_server(server,
1154                                                                     shnum)
1155                if expected_version:
1156                    (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1157                     offsets_tuple) = expected_version
1158                    msg = ("somebody modified the share on us:"
1159                           " shnum=%d: I thought they had #%d:R=%r," %
1160                           (shnum,
1161                            seqnum, base32.b2a(root_hash)[:4]))
1162                    if unknown_format:
1163                        msg += (" but I don't know how to read share"
1164                                " format %d" % version)
1165                    else:
1166                        msg += " but testv reported #%d:R=%r" % \
1167                               (other_seqnum, base32.b2a(other_roothash)[:4])
1168                    self.log(msg, parent=lp, level=log.NOISY)
1169                # if expected_version==None, then we didn't expect to see a
1170                # share on that server, and the 'surprise_shares' clause
1171                # above will have logged it.
1172            return
1173
1174        # and update the servermap
1175        # self.versioninfo is set during the last phase of publishing.
1176        # If we get there, we know that responses correspond to placed
1177        # shares, and can safely execute these statements.
1178        if self.versioninfo:
1179            self.log("wrote successfully: adding new share to servermap")
1180            self._servermap.add_new_share(server, writer.shnum,
1181                                          self.versioninfo, started)
1182            self.placed.add( (server, writer.shnum) )
1183        self._update_status()
1184        # the next method in the deferred chain will check to see if
1185        # we're done and successful.
1186        return
1187
1188
1189    def _done(self):
1190        if not self._running:
1191            return
1192        self._running = False
1193        now = time.time()
1194        self._status.timings["total"] = now - self._started
1195
1196        elapsed = now - self._started_pushing
1197        self._status.timings['push'] = elapsed
1198
1199        self._status.set_active(False)
1200        self.log("Publish done, success")
1201        self._status.set_status("Finished")
1202        self._status.set_progress(1.0)
1203        # Get k and segsize, then give them to the caller.
1204        hints = {}
1205        hints['segsize'] = self.segment_size
1206        hints['k'] = self.required_shares
1207        self._node.set_downloader_hints(hints)
1208        eventually(self.done_deferred.callback, None)
1209
1210    def _failure(self, f=None):
1211        if f:
1212            self._last_failure = f
1213
1214        if not self.surprised:
1215            # We ran out of servers
1216            msg = "Publish ran out of good servers"
1217            if self._last_failure:
1218                msg += ", last failure was: %s" % str(self._last_failure)
1219            self.log(msg)
1220            e = NotEnoughServersError(msg)
1221
1222        else:
1223            # We ran into shares that we didn't recognize, which means
1224            # that we need to return an UncoordinatedWriteError.
1225            self.log("Publish failed with UncoordinatedWriteError")
1226            e = UncoordinatedWriteError()
1227        f = failure.Failure(e)
1228        eventually(self.done_deferred.callback, f)
1229
1230
1231@implementer(IMutableUploadable)
1232class MutableFileHandle(object):
1233    """
1234    I am a mutable uploadable built around a filehandle-like object,
1235    usually either a BytesIO instance or a handle to an actual file.
1236    """
1237
1238    def __init__(self, filehandle):
1239        # The filehandle is defined as a generally file-like object that
1240        # has these two methods. We don't care beyond that.
1241        assert hasattr(filehandle, "read")
1242        assert hasattr(filehandle, "close")
1243
1244        self._filehandle = filehandle
1245        # We must start reading at the beginning of the file, or we risk
1246        # encountering errors when the data read does not match the size
1247        # reported to the uploader.
1248        self._filehandle.seek(0)
1249
1250        # We have not yet read anything, so our position is 0.
1251        self._marker = 0
1252
1253
1254    def get_size(self):
1255        """
1256        I return the amount of data in my filehandle.
1257        """
1258        if not hasattr(self, "_size"):
1259            old_position = self._filehandle.tell()
1260            # Seek to the end of the file by seeking 0 bytes from the
1261            # file's end
1262            self._filehandle.seek(0, os.SEEK_END)
1263            self._size = self._filehandle.tell()
1264            # Restore the previous position, in case this was called
1265            # after a read.
1266            self._filehandle.seek(old_position)
1267            assert self._filehandle.tell() == old_position
1268
1269        assert hasattr(self, "_size")
1270        return self._size
1271
1272
1273    def pos(self):
1274        """
1275        I return the position of my read marker -- i.e., how much data I
1276        have already read and returned to callers.
1277        """
1278        return self._marker
1279
1280
1281    def read(self, length):
1282        """
1283        I return some data (up to length bytes) from my filehandle.
1284
1285        In most cases, I return length bytes, but sometimes I won't --
1286        for example, if I am asked to read beyond the end of a file, or
1287        an error occurs.
1288        """
1289        results = self._filehandle.read(length)
1290        self._marker += len(results)
1291        return [results]
1292
1293
1294    def close(self):
1295        """
1296        I close the underlying filehandle. Any further operations on the
1297        filehandle fail at this point.
1298        """
1299        self._filehandle.close()
1300
1301
1302class MutableData(MutableFileHandle):
1303    """
1304    I am a mutable uploadable built around a string, which I then cast
1305    into a BytesIO and treat as a filehandle.
1306    """
1307
1308    def __init__(self, s):
1309        # Take a string and return a file-like uploadable.
1310        assert isinstance(s, bytes)
1311
1312        MutableFileHandle.__init__(self, BytesIO(s))
1313
1314
1315@implementer(IMutableUploadable)
1316class TransformingUploadable(object):
1317    """
1318    I am an IMutableUploadable that wraps another IMutableUploadable,
1319    and some segments that are already on the grid. When I am called to
1320    read, I handle merging of boundary segments.
1321    """
1322
1323
1324    def __init__(self, data, offset, segment_size, start, end):
1325        assert IMutableUploadable.providedBy(data)
1326
1327        self._newdata = data
1328        self._offset = offset
1329        self._segment_size = segment_size
1330        self._start = start
1331        self._end = end
1332
1333        self._read_marker = 0
1334
1335        self._first_segment_offset = offset % segment_size
1336
1337        num = self.log("TransformingUploadable: starting", parent=None)
1338        self._log_number = num
1339        self.log("got fso: %d" % self._first_segment_offset)
1340        self.log("got offset: %d" % self._offset)
1341
1342
1343    def log(self, *args, **kwargs):
1344        if 'parent' not in kwargs:
1345            kwargs['parent'] = self._log_number
1346        if "facility" not in kwargs:
1347            kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1348        return log.msg(*args, **kwargs)
1349
1350
1351    def get_size(self):
1352        return self._offset + self._newdata.get_size()
1353
1354
1355    def read(self, length):
1356        # We can get data from 3 sources here.
1357        #   1. The first of the segments provided to us.
1358        #   2. The data that we're replacing things with.
1359        #   3. The last of the segments provided to us.
1360
1361        # are we in state 0?
1362        self.log("reading %d bytes" % length)
1363
1364        old_start_data = b""
1365        old_data_length = self._first_segment_offset - self._read_marker
1366        if old_data_length > 0:
1367            if old_data_length > length:
1368                old_data_length = length
1369            self.log("returning %d bytes of old start data" % old_data_length)
1370
1371            old_data_end = old_data_length + self._read_marker
1372            old_start_data = self._start[self._read_marker:old_data_end]
1373            length -= old_data_length
1374        else:
1375            # otherwise calculations later get screwed up.
1376            old_data_length = 0
1377
1378        # Is there enough new data to satisfy this read? If not, we need
1379        # to pad the end of the data with data from our last segment.
1380        old_end_length = length - \
1381            (self._newdata.get_size() - self._newdata.pos())
1382        old_end_data = b""
1383        if old_end_length > 0:
1384            self.log("reading %d bytes of old end data" % old_end_length)
1385
1386            # TODO: We're not explicitly checking for tail segment size
1387            # here. Is that a problem?
1388            old_data_offset = (length - old_end_length + \
1389                               old_data_length) % self._segment_size
1390            self.log("reading at offset %d" % old_data_offset)
1391            old_end = old_data_offset + old_end_length
1392            old_end_data = self._end[old_data_offset:old_end]
1393            length -= old_end_length
1394            assert length == self._newdata.get_size() - self._newdata.pos()
1395
1396        self.log("reading %d bytes of new data" % length)
1397        new_data = self._newdata.read(length)
1398        new_data = b"".join(new_data)
1399
1400        self._read_marker += len(old_start_data + new_data + old_end_data)
1401
1402        return old_start_data + new_data + old_end_data
1403
1404    def close(self):
1405        pass
Note: See TracBrowser for help on using the repository browser.