source: trunk/src/allmydata/dirnode.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 36.8 KB
Line 
1"""Directory Node implementation.
2
3Ported to Python 3.
4"""
5
6from past.builtins import unicode
7
8import time
9
10from zope.interface import implementer
11from twisted.internet import defer
12from foolscap.api import fireEventually
13
14from allmydata.crypto import aes
15from allmydata.deep_stats import DeepStats
16from allmydata.mutable.common import NotWriteableError
17from allmydata.mutable.filenode import MutableFileNode
18from allmydata.unknown import UnknownNode, strip_prefix_for_ro
19from allmydata.interfaces import IFilesystemNode, IDirectoryNode, IFileNode, \
20     ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable, \
21     MustBeDeepImmutableError, CapConstraintError, ChildOfWrongTypeError
22from allmydata.check_results import DeepCheckResults, \
23     DeepCheckAndRepairResults
24from allmydata.monitor import Monitor
25from allmydata.util import hashutil, base32, log, jsonbytes as json
26from allmydata.util.encodingutil import quote_output, normalize
27from allmydata.util.assertutil import precondition
28from allmydata.util.netstring import netstring, split_netstring
29from allmydata.util.consumer import download_to_data
30from allmydata.uri import wrap_dirnode_cap
31from allmydata.util.dictutil import AuxValueDict
32
33from eliot import (
34    ActionType,
35    Field,
36)
37from eliot.twisted import (
38    DeferredContext,
39)
40
41NAME = Field.for_types(
42    u"name",
43    # Make sure this works on Python 2; with str, it gets Future str which
44    # breaks Eliot.
45    [unicode],
46    u"The name linking the parent to this node.",
47)
48
49METADATA = Field.for_types(
50    u"metadata",
51    [dict],
52    u"Data about a node.",
53)
54
55OVERWRITE = Field.for_types(
56    u"overwrite",
57    [bool],
58    u"True to replace an existing file of the same name, "
59    u"false to fail with a collision error.",
60)
61
62ADD_FILE = ActionType(
63    u"dirnode:add-file",
64    [NAME, METADATA, OVERWRITE],
65    [],
66    u"Add a new file as a child of a directory.",
67)
68
69
70class _OnlyFiles(object):
71    """Marker for replacement option of only replacing files."""
72
73ONLY_FILES = _OnlyFiles()
74
75
76def update_metadata(metadata, new_metadata, now):
77    """Updates 'metadata' in-place with the information in 'new_metadata'.
78
79    Timestamps are set according to the time 'now'.
80    """
81
82    if metadata is None:
83        metadata = {}
84
85    old_ctime = None
86    if 'ctime' in metadata:
87        old_ctime = metadata['ctime']
88
89    if new_metadata is not None:
90        # Overwrite all metadata.
91        newmd = new_metadata.copy()
92
93        # Except 'tahoe'.
94        if 'tahoe' in newmd:
95            del newmd['tahoe']
96        if 'tahoe' in metadata:
97            newmd['tahoe'] = metadata['tahoe']
98
99        metadata = newmd
100
101    # update timestamps
102    sysmd = metadata.get('tahoe', {})
103    if 'linkcrtime' not in sysmd:
104        # In Tahoe < 1.4.0 we used the word 'ctime' to mean what Tahoe >= 1.4.0
105        # calls 'linkcrtime'. This field is only used if it was in the old metadata,
106        # and 'tahoe:linkcrtime' was not.
107        if old_ctime is not None:
108            sysmd['linkcrtime'] = old_ctime
109        else:
110            sysmd['linkcrtime'] = now
111
112    sysmd['linkmotime'] = now
113    metadata['tahoe'] = sysmd
114
115    return metadata
116
117
118# TODO: {Deleter,MetadataSetter,Adder}.modify all start by unpacking the
119# contents and end by repacking them. It might be better to apply them to
120# the unpacked contents.
121
122class Deleter(object):
123    def __init__(self, node, namex, must_exist=True, must_be_directory=False, must_be_file=False):
124        self.node = node
125        self.name = normalize(namex)
126        self.must_exist = must_exist
127        self.must_be_directory = must_be_directory
128        self.must_be_file = must_be_file
129
130    def modify(self, old_contents, servermap, first_time):
131        children = self.node._unpack_contents(old_contents)
132        if self.name not in children:
133            if first_time and self.must_exist:
134                raise NoSuchChildError(self.name)
135            self.old_child = None
136            return None
137        self.old_child, metadata = children[self.name]
138
139        # Unknown children can be removed regardless of must_be_directory or must_be_file.
140        if self.must_be_directory and IFileNode.providedBy(self.old_child):
141            raise ChildOfWrongTypeError("delete required a directory, not a file")
142        if self.must_be_file and IDirectoryNode.providedBy(self.old_child):
143            raise ChildOfWrongTypeError("delete required a file, not a directory")
144
145        del children[self.name]
146        new_contents = self.node._pack_contents(children)
147        return new_contents
148
149
150class MetadataSetter(object):
151    def __init__(self, node, namex, metadata, create_readonly_node=None):
152        self.node = node
153        self.name = normalize(namex)
154        self.metadata = metadata
155        self.create_readonly_node = create_readonly_node
156
157    def modify(self, old_contents, servermap, first_time):
158        children = self.node._unpack_contents(old_contents)
159        name = self.name
160        if name not in children:
161            raise NoSuchChildError(name)
162
163        now = time.time()
164        child = children[name][0]
165
166        metadata = update_metadata(children[name][1].copy(), self.metadata, now)
167        if self.create_readonly_node and metadata.get('no-write', False):
168            child = self.create_readonly_node(child, name)
169
170        children[name] = (child, metadata)
171        new_contents = self.node._pack_contents(children)
172        return new_contents
173
174
175class Adder(object):
176    def __init__(self, node, entries=None, overwrite=True, create_readonly_node=None):
177        """
178        :param overwrite: Either True (allow overwriting anything existing),
179            False (don't allow overwriting), or ONLY_FILES (only files can be
180            overwritten).
181        """
182        self.node = node
183        if entries is None:
184            entries = {}
185        precondition(isinstance(entries, dict), entries)
186        precondition(overwrite in (True, False, ONLY_FILES), overwrite)
187        # keys of 'entries' may not be normalized.
188        self.entries = entries
189        self.overwrite = overwrite
190        self.create_readonly_node = create_readonly_node
191
192    def set_node(self, namex, node, metadata):
193        precondition(IFilesystemNode.providedBy(node), node)
194        self.entries[namex] = (node, metadata)
195
196    def modify(self, old_contents, servermap, first_time):
197        children = self.node._unpack_contents(old_contents)
198        now = time.time()
199        for (namex, (child, new_metadata)) in list(self.entries.items()):
200            name = normalize(namex)
201            precondition(IFilesystemNode.providedBy(child), child)
202
203            # Strictly speaking this is redundant because we would raise the
204            # error again in _pack_normalized_children.
205            child.raise_error()
206
207            metadata = None
208            if name in children:
209                if not self.overwrite:
210                    raise ExistingChildError("child %s already exists" % quote_output(name, encoding='utf-8'))
211
212                if self.overwrite == ONLY_FILES and IDirectoryNode.providedBy(children[name][0]):
213                    raise ExistingChildError("child %s already exists as a directory" % quote_output(name, encoding='utf-8'))
214                metadata = children[name][1].copy()
215
216            metadata = update_metadata(metadata, new_metadata, now)
217            if self.create_readonly_node and metadata.get('no-write', False):
218                child = self.create_readonly_node(child, name)
219
220            children[name] = (child, metadata)
221        new_contents = self.node._pack_contents(children)
222        return new_contents
223
224def _encrypt_rw_uri(writekey, rw_uri):
225    precondition(isinstance(rw_uri, bytes), rw_uri)
226    precondition(isinstance(writekey, bytes), writekey)
227
228    salt = hashutil.mutable_rwcap_salt_hash(rw_uri)
229    key = hashutil.mutable_rwcap_key_hash(salt, writekey)
230    encryptor = aes.create_encryptor(key)
231    crypttext = aes.encrypt_data(encryptor, rw_uri)
232    mac = hashutil.hmac(key, salt + crypttext)
233    assert len(mac) == 32
234    return salt + crypttext + mac
235    # The MAC is not checked by readers in Tahoe >= 1.3.0, but we still
236    # produce it for the sake of older readers.
237
238def pack_children(childrenx, writekey, deep_immutable=False):
239    # initial_children must have metadata (i.e. {} instead of None)
240    children = {}
241    for (namex, (node, metadata)) in list(childrenx.items()):
242        precondition(isinstance(metadata, dict),
243                     "directory creation requires metadata to be a dict, not None", metadata)
244        children[normalize(namex)] = (node, metadata)
245
246    return _pack_normalized_children(children, writekey=writekey, deep_immutable=deep_immutable)
247
248
249ZERO_LEN_NETSTR=netstring(b'')
250def _pack_normalized_children(children, writekey, deep_immutable=False):
251    """Take a dict that maps:
252         children[unicode_nfc_name] = (IFileSystemNode, metadata_dict)
253    and pack it into a single string, for use as the contents of the backing
254    file. This is the same format as is returned by _unpack_contents. I also
255    accept an AuxValueDict, in which case I'll use the auxilliary cached data
256    as the pre-packed entry, which is faster than re-packing everything each
257    time.
258
259    If writekey is provided then I will superencrypt the child's writecap with
260    writekey.
261
262    If deep_immutable is True, I will require that all my children are deeply
263    immutable, and will raise a MustBeDeepImmutableError if not.
264    """
265    precondition((writekey is None) or isinstance(writekey, bytes), writekey)
266
267    has_aux = isinstance(children, AuxValueDict)
268    entries = []
269    for name in sorted(children.keys()):
270        assert isinstance(name, str)
271        entry = None
272        (child, metadata) = children[name]
273        child.raise_error()
274        if deep_immutable and not child.is_allowed_in_immutable_directory():
275            raise MustBeDeepImmutableError(
276                "child %r is not allowed in an immutable directory" % (name,),
277                name)
278        if has_aux:
279            entry = children.get_aux(name)
280        if not entry:
281            assert IFilesystemNode.providedBy(child), (name,child)
282            assert isinstance(metadata, dict)
283            rw_uri = child.get_write_uri()
284            if rw_uri is None:
285                rw_uri = b""
286            assert isinstance(rw_uri, bytes), rw_uri
287
288            # should be prevented by MustBeDeepImmutableError check above
289            assert not (rw_uri and deep_immutable)
290
291            ro_uri = child.get_readonly_uri()
292            if ro_uri is None:
293                ro_uri = b""
294            assert isinstance(ro_uri, bytes), ro_uri
295            if writekey is not None:
296                writecap = netstring(_encrypt_rw_uri(writekey, rw_uri))
297            else:
298                writecap = ZERO_LEN_NETSTR
299            entry = b"".join([netstring(name.encode("utf-8")),
300                             netstring(strip_prefix_for_ro(ro_uri, deep_immutable)),
301                             writecap,
302                             netstring(json.dumps(metadata).encode("utf-8"))])
303        entries.append(netstring(entry))
304    return b"".join(entries)
305
306@implementer(IDirectoryNode, ICheckable, IDeepCheckable)
307class DirectoryNode(object):
308    filenode_class = MutableFileNode
309
310    def __init__(self, filenode, nodemaker, uploader):
311        assert IFileNode.providedBy(filenode), filenode
312        assert not IDirectoryNode.providedBy(filenode), filenode
313        self._node = filenode
314        filenode_cap = filenode.get_cap()
315        self._uri = wrap_dirnode_cap(filenode_cap)
316        self._nodemaker = nodemaker
317        self._uploader = uploader
318
319    def __repr__(self):
320        return "<%s %s-%s %s>" % (self.__class__.__name__,
321                                  self.is_readonly() and "RO" or "RW",
322                                  self.is_mutable() and "MUT" or "IMM",
323                                  hasattr(self, '_uri') and str(self._uri.abbrev(), "utf-8"))
324
325    def get_size(self):
326        """Return the size of our backing mutable file, in bytes, if we've
327        fetched it. Otherwise return None. This returns synchronously."""
328        return self._node.get_size()
329
330    def get_current_size(self):
331        """Calculate the size of our backing mutable file, in bytes. Returns
332        a Deferred that fires with the result."""
333        return self._node.get_current_size()
334
335    def _read(self):
336        if self._node.is_mutable():
337            # use the IMutableFileNode API.
338            d = self._node.download_best_version()
339        else:
340            d = download_to_data(self._node)
341        d.addCallback(self._unpack_contents)
342        return d
343
344    def _decrypt_rwcapdata(self, encwrcap):
345        salt = encwrcap[:16]
346        crypttext = encwrcap[16:-32]
347        key = hashutil.mutable_rwcap_key_hash(salt, self._node.get_writekey())
348        encryptor = aes.create_decryptor(key)
349        plaintext = aes.decrypt_data(encryptor, crypttext)
350        return plaintext
351
352    def _create_and_validate_node(self, rw_uri, ro_uri, name):
353        # name is just for error reporting
354        node = self._nodemaker.create_from_cap(rw_uri, ro_uri,
355                                               deep_immutable=not self.is_mutable(),
356                                               name=name)
357        node.raise_error()
358        return node
359
360    def _create_readonly_node(self, node, name):
361        # name is just for error reporting
362        if not node.is_unknown() and node.is_readonly():
363            return node
364        return self._create_and_validate_node(None, node.get_readonly_uri(), name=name)
365
366    def _unpack_contents(self, data):
367        # the directory is serialized as a list of netstrings, one per child.
368        # Each child is serialized as a list of four netstrings: (name, ro_uri,
369        # rwcapdata, metadata), in which the name, ro_uri, metadata are in
370        # cleartext. The 'name' is UTF-8 encoded, and should be normalized to NFC.
371        # The rwcapdata is formatted as:
372        # pack("16ss32s", iv, AES(H(writekey+iv), plaintext_rw_uri), mac)
373        assert isinstance(data, bytes), (repr(data), type(data))
374        # an empty directory is serialized as an empty string
375        if data == b"":
376            return AuxValueDict()
377        writeable = not self.is_readonly()
378        mutable = self.is_mutable()
379        children = AuxValueDict()
380        position = 0
381        while position < len(data):
382            entries, position = split_netstring(data, 1, position)
383            entry = entries[0]
384            (namex_utf8, ro_uri, rwcapdata, metadata_s), subpos = split_netstring(entry, 4)
385            if not mutable and len(rwcapdata) > 0:
386                raise ValueError("the rwcapdata field of a dirnode in an immutable directory was not empty")
387
388            # A name containing characters that are unassigned in one version of Unicode might
389            # not be normalized wrt a later version. See the note in section 'Normalization Stability'
390            # at <http://unicode.org/policies/stability_policy.html>.
391            # Therefore we normalize names going both in and out of directories.
392            name = normalize(namex_utf8.decode("utf-8"))
393
394            rw_uri = b""
395            if writeable:
396                rw_uri = self._decrypt_rwcapdata(rwcapdata)
397
398            # Since the encryption uses CTR mode, it currently leaks the length of the
399            # plaintext rw_uri -- and therefore whether it is present, i.e. whether the
400            # dirnode is writeable (ticket #925). By stripping trailing spaces in
401            # Tahoe >= 1.6.0, we may make it easier for future versions to plug this leak.
402            # ro_uri is treated in the same way for consistency.
403            # rw_uri and ro_uri will be either None or a non-empty string.
404
405            rw_uri = rw_uri.rstrip(b' ') or None
406            ro_uri = ro_uri.rstrip(b' ') or None
407
408            try:
409                child = self._create_and_validate_node(rw_uri, ro_uri, name)
410                if mutable or child.is_allowed_in_immutable_directory():
411                    metadata = json.loads(metadata_s)
412                    assert isinstance(metadata, dict)
413                    children[name] = (child, metadata)
414                    children.set_with_aux(name, (child, metadata), auxilliary=entry)
415                else:
416                    log.msg(format="mutable cap for child %(name)s unpacked from an immutable directory",
417                            name=quote_output(name, encoding='utf-8'),
418                            facility="tahoe.webish", level=log.UNUSUAL)
419            except CapConstraintError as e:
420                log.msg(format="unmet constraint on cap for child %(name)s unpacked from a directory:\n"
421                               "%(message)s", message=e.args[0], name=quote_output(name, encoding='utf-8'),
422                               facility="tahoe.webish", level=log.UNUSUAL)
423
424        return children
425
426    def _pack_contents(self, children):
427        # expects children in the same format as _unpack_contents returns
428        return _pack_normalized_children(children, self._node.get_writekey())
429
430    def is_readonly(self):
431        return self._node.is_readonly()
432
433    def is_mutable(self):
434        return self._node.is_mutable()
435
436    def is_unknown(self):
437        return False
438
439    def is_allowed_in_immutable_directory(self):
440        return not self._node.is_mutable()
441
442    def raise_error(self):
443        pass
444
445    def get_uri(self):
446        return self._uri.to_string()
447
448    def get_write_uri(self):
449        if self.is_readonly():
450            return None
451        return self._uri.to_string()
452
453    def get_readonly_uri(self):
454        return self._uri.get_readonly().to_string()
455
456    def get_cap(self):
457        return self._uri
458
459    def get_readcap(self):
460        return self._uri.get_readonly()
461
462    def get_verify_cap(self):
463        return self._uri.get_verify_cap()
464
465    def get_repair_cap(self):
466        if self._node.is_readonly():
467            return None # readonly (mutable) dirnodes are not yet repairable
468        return self._uri
469
470    def get_storage_index(self):
471        return self._uri.get_storage_index()
472
473    def check(self, monitor, verify=False, add_lease=False):
474        """Perform a file check. See IChecker.check for details."""
475        return self._node.check(monitor, verify, add_lease)
476    def check_and_repair(self, monitor, verify=False, add_lease=False):
477        return self._node.check_and_repair(monitor, verify, add_lease)
478
479    def list(self):
480        """I return a Deferred that fires with a dictionary mapping child
481        name to a tuple of (IFilesystemNode, metadata)."""
482        return self._read()
483
484    def has_child(self, namex):
485        """I return a Deferred that fires with a boolean, True if there
486        exists a child of the given name, False if not."""
487        name = normalize(namex)
488        d = self._read()
489        d.addCallback(lambda children: name in children)
490        return d
491
492    def _get(self, children, name):
493        child = children.get(name)
494        if child is None:
495            raise NoSuchChildError(name)
496        return child[0]
497
498    def _get_with_metadata(self, children, name):
499        child = children.get(name)
500        if child is None:
501            raise NoSuchChildError(name)
502        return child
503
504    def get(self, namex):
505        """I return a Deferred that fires with the named child node,
506        which is an IFilesystemNode."""
507        name = normalize(namex)
508        d = self._read()
509        d.addCallback(self._get, name)
510        return d
511
512    def get_child_and_metadata(self, namex):
513        """I return a Deferred that fires with the (node, metadata) pair for
514        the named child. The node is an IFilesystemNode, and the metadata
515        is a dictionary."""
516        name = normalize(namex)
517        d = self._read()
518        d.addCallback(self._get_with_metadata, name)
519        return d
520
521    def get_metadata_for(self, namex):
522        name = normalize(namex)
523        d = self._read()
524        d.addCallback(lambda children: children[name][1])
525        return d
526
527    def set_metadata_for(self, namex, metadata):
528        name = normalize(namex)
529        if self.is_readonly():
530            return defer.fail(NotWriteableError())
531        assert isinstance(metadata, dict)
532        s = MetadataSetter(self, name, metadata,
533                           create_readonly_node=self._create_readonly_node)
534        d = self._node.modify(s.modify)
535        d.addCallback(lambda res: self)
536        return d
537
538    def get_child_at_path(self, pathx):
539        """Transform a child path into an IFilesystemNode.
540
541        I perform a recursive series of 'get' operations to find the named
542        descendant node. I return a Deferred that fires with the node, or
543        errbacks with IndexError if the node could not be found.
544
545        The path can be either a single string (slash-separated) or a list of
546        path-name elements.
547        """
548        d = self.get_child_and_metadata_at_path(pathx)
549        d.addCallback(lambda node_and_metadata: node_and_metadata[0])
550        return d
551
552    def get_child_and_metadata_at_path(self, pathx):
553        """Transform a child path into an IFilesystemNode and
554        a metadata dictionary from the last edge that was traversed.
555        """
556
557        if not pathx:
558            return defer.succeed((self, {}))
559        if isinstance(pathx, (list, tuple)):
560            pass
561        else:
562            pathx = pathx.split("/")
563        for p in pathx:
564            assert isinstance(p, str), p
565        childnamex = pathx[0]
566        remaining_pathx = pathx[1:]
567        if remaining_pathx:
568            d = self.get(childnamex)
569            d.addCallback(lambda node:
570                          node.get_child_and_metadata_at_path(remaining_pathx))
571            return d
572        d = self.get_child_and_metadata(childnamex)
573        return d
574
575    def set_uri(self, namex, writecap, readcap=None, metadata=None, overwrite=True):
576        precondition(isinstance(writecap, (bytes, type(None))), writecap)
577        precondition(isinstance(readcap, (bytes, type(None))), readcap)
578
579        # We now allow packing unknown nodes, provided they are valid
580        # for this type of directory.
581        child_node = self._create_and_validate_node(writecap, readcap, namex)
582        d = self.set_node(namex, child_node, metadata, overwrite)
583        d.addCallback(lambda res: child_node)
584        return d
585
586    def set_children(self, entries, overwrite=True):
587        # this takes URIs
588        a = Adder(self, overwrite=overwrite,
589                  create_readonly_node=self._create_readonly_node)
590        for (namex, e) in entries.items():
591            assert isinstance(namex, str), namex
592            if len(e) == 2:
593                writecap, readcap = e
594                metadata = None
595            else:
596                assert len(e) == 3
597                writecap, readcap, metadata = e
598            precondition(isinstance(writecap, (bytes,type(None))), writecap)
599            precondition(isinstance(readcap, (bytes,type(None))), readcap)
600
601            # We now allow packing unknown nodes, provided they are valid
602            # for this type of directory.
603            child_node = self._create_and_validate_node(writecap, readcap, namex)
604            a.set_node(namex, child_node, metadata)
605        d = self._node.modify(a.modify)
606        d.addCallback(lambda ign: self)
607        return d
608
609    def set_node(self, namex, child, metadata=None, overwrite=True):
610        """I add a child at the specific name. I return a Deferred that fires
611        when the operation finishes. This Deferred will fire with the child
612        node that was just added. I will replace any existing child of the
613        same name.
614
615        If this directory node is read-only, the Deferred will errback with a
616        NotWriteableError."""
617
618        precondition(IFilesystemNode.providedBy(child), child)
619
620        if self.is_readonly():
621            return defer.fail(NotWriteableError())
622        assert IFilesystemNode.providedBy(child), child
623        a = Adder(self, overwrite=overwrite,
624                  create_readonly_node=self._create_readonly_node)
625        a.set_node(namex, child, metadata)
626        d = self._node.modify(a.modify)
627        d.addCallback(lambda res: child)
628        return d
629
630    def set_nodes(self, entries, overwrite=True):
631        precondition(isinstance(entries, dict), entries)
632        if self.is_readonly():
633            return defer.fail(NotWriteableError())
634        a = Adder(self, entries, overwrite=overwrite,
635                  create_readonly_node=self._create_readonly_node)
636        d = self._node.modify(a.modify)
637        d.addCallback(lambda res: self)
638        return d
639
640
641    def add_file(self, namex, uploadable, metadata=None, overwrite=True):
642        """I upload a file (using the given IUploadable), then attach the
643        resulting FileNode to the directory at the given name. I return a
644        Deferred that fires (with the IFileNode of the uploaded file) when
645        the operation completes."""
646        with ADD_FILE(name=namex, metadata=metadata, overwrite=overwrite).context():
647            name = normalize(namex)
648            if self.is_readonly():
649                d = DeferredContext(defer.fail(NotWriteableError()))
650            else:
651                # XXX should pass reactor arg
652                d = DeferredContext(self._uploader.upload(uploadable))
653                d.addCallback(lambda results:
654                              self._create_and_validate_node(results.get_uri(), None,
655                                                             name))
656                d.addCallback(lambda node:
657                              self.set_node(name, node, metadata, overwrite))
658
659        return d.addActionFinish()
660
661    def delete(self, namex, must_exist=True, must_be_directory=False, must_be_file=False):
662        """I remove the child at the specific name. I return a Deferred that
663        fires (with the node just removed) when the operation finishes."""
664        if self.is_readonly():
665            return defer.fail(NotWriteableError())
666        deleter = Deleter(self, namex, must_exist=must_exist,
667                          must_be_directory=must_be_directory, must_be_file=must_be_file)
668        d = self._node.modify(deleter.modify)
669        d.addCallback(lambda res: deleter.old_child)
670        return d
671
672    # XXX: Too many arguments? Worthwhile to break into mutable/immutable?
673    def create_subdirectory(self, namex, initial_children=None, overwrite=True,
674                            mutable=True, mutable_version=None, metadata=None):
675        if initial_children is None:
676            initial_children = {}
677        name = normalize(namex)
678        if self.is_readonly():
679            return defer.fail(NotWriteableError())
680        if mutable:
681            if mutable_version:
682                d = self._nodemaker.create_new_mutable_directory(initial_children,
683                                                                 version=mutable_version)
684            else:
685                d = self._nodemaker.create_new_mutable_directory(initial_children)
686        else:
687            # mutable version doesn't make sense for immmutable directories.
688            assert mutable_version is None
689            d = self._nodemaker.create_immutable_directory(initial_children)
690        def _created(child):
691            entries = {name: (child, metadata)}
692            a = Adder(self, entries, overwrite=overwrite,
693                      create_readonly_node=self._create_readonly_node)
694            d = self._node.modify(a.modify)
695            d.addCallback(lambda res: child)
696            return d
697        d.addCallback(_created)
698        return d
699
700    def move_child_to(self, current_child_namex, new_parent,
701                      new_child_namex=None, overwrite=True):
702        """
703        I take one of my child links and move it to a new parent. The child
704        link is referenced by name. In the new parent, the child link will live
705        at 'new_child_namex', which defaults to 'current_child_namex'. I return
706        a Deferred that fires when the operation finishes.
707        'new_child_namex' and 'current_child_namex' need not be normalized.
708
709        The overwrite parameter may be True (overwrite any existing child),
710        False (error if the new child link already exists), or ONLY_FILES
711        (error if the new child link exists and points to a directory).
712        """
713        if self.is_readonly() or new_parent.is_readonly():
714            return defer.fail(NotWriteableError())
715
716        current_child_name = normalize(current_child_namex)
717        if new_child_namex is None:
718            new_child_name = current_child_name
719        else:
720            new_child_name = normalize(new_child_namex)
721
722        from_uri = self.get_write_uri()
723        if new_parent.get_write_uri() == from_uri and new_child_name == current_child_name:
724            # needed for correctness, otherwise we would delete the child
725            return defer.succeed("redundant rename/relink")
726
727        d = self.get_child_and_metadata(current_child_name)
728        def _got_child(child_and_metadata):
729            (child, metadata) = child_and_metadata
730            return new_parent.set_node(new_child_name, child, metadata,
731                                       overwrite=overwrite)
732        d.addCallback(_got_child)
733        d.addCallback(lambda child: self.delete(current_child_name))
734        return d
735
736
737    def deep_traverse(self, walker):
738        """Perform a recursive walk, using this dirnode as a root, notifying
739        the 'walker' instance of everything I encounter.
740
741        I call walker.enter_directory(parent, children) once for each dirnode
742        I visit, immediately after retrieving the list of children. I pass in
743        the parent dirnode and the dict of childname->(childnode,metadata).
744        This function should *not* traverse the children: I will do that.
745        enter_directory() is most useful for the deep-stats number that
746        counts how large a directory is.
747
748        I call walker.add_node(node, path) for each node (both files and
749        directories) I can reach. Most work should be done here.
750
751        I avoid loops by keeping track of verifier-caps and refusing to call
752        walker.add_node() or traverse a node that I've seen before. This
753        means that any file or directory will only be given to the walker
754        once. If files or directories are referenced multiple times by a
755        directory structure, this may appear to under-count or miss some of
756        them.
757
758        I return a Monitor which can be used to wait for the operation to
759        finish, learn about its progress, or cancel the operation.
760        """
761
762        # this is just a tree-walker, except that following each edge
763        # requires a Deferred. We used to use a ConcurrencyLimiter to limit
764        # fanout to 10 simultaneous operations, but the memory load of the
765        # queued operations was excessive (in one case, with 330k dirnodes,
766        # it caused the process to run into the 3.0GB-ish per-process 32bit
767        # linux memory limit, and crashed). So we use a single big Deferred
768        # chain, and do a strict depth-first traversal, one node at a time.
769        # This can be slower, because we aren't pipelining directory reads,
770        # but it brought the memory footprint down by roughly 50%.
771
772        monitor = Monitor()
773        walker.set_monitor(monitor)
774
775        found = set([self.get_verify_cap()])
776        d = self._deep_traverse_dirnode(self, [], walker, monitor, found)
777        d.addCallback(lambda ignored: walker.finish())
778        d.addBoth(monitor.finish)
779        d.addErrback(lambda f: None)
780
781        return monitor
782
783    def _deep_traverse_dirnode(self, node, path, walker, monitor, found):
784        # process this directory, then walk its children
785        monitor.raise_if_cancelled()
786        d = defer.maybeDeferred(walker.add_node, node, path)
787        d.addCallback(lambda ignored: node.list())
788        d.addCallback(self._deep_traverse_dirnode_children, node, path,
789                      walker, monitor, found)
790        return d
791
792    def _deep_traverse_dirnode_children(self, children, parent, path,
793                                        walker, monitor, found):
794        monitor.raise_if_cancelled()
795        d = defer.maybeDeferred(walker.enter_directory, parent, children)
796        # we process file-like children first, so we can drop their FileNode
797        # objects as quickly as possible. Tests suggest that a FileNode (held
798        # in the client's nodecache) consumes about 2440 bytes. dirnodes (not
799        # in the nodecache) seem to consume about 2000 bytes.
800        dirkids = []
801        filekids = []
802        for name, (child, metadata) in sorted(children.items()):
803            childpath = path + [name]
804            if isinstance(child, UnknownNode):
805                walker.add_node(child, childpath)
806                continue
807            verifier = child.get_verify_cap()
808            # allow LIT files (for which verifier==None) to be processed
809            if (verifier is not None) and (verifier in found):
810                continue
811            found.add(verifier)
812            if IDirectoryNode.providedBy(child):
813                dirkids.append( (child, childpath) )
814            else:
815                filekids.append( (child, childpath) )
816        for i, (child, childpath) in enumerate(filekids):
817            d.addCallback(lambda ignored, child=child, childpath=childpath:
818                          walker.add_node(child, childpath))
819            # to work around the Deferred tail-recursion problem
820            # (specifically the defer.succeed flavor) requires us to avoid
821            # doing more than 158 LIT files in a row. We insert a turn break
822            # once every 100 files (LIT or CHK) to preserve some stack space
823            # for other code. This is a different expression of the same
824            # Twisted problem as in #237.
825            if i % 100 == 99:
826                d.addCallback(lambda ignored: fireEventually())
827        for (child, childpath) in dirkids:
828            d.addCallback(lambda ignored, child=child, childpath=childpath:
829                          self._deep_traverse_dirnode(child, childpath,
830                                                      walker, monitor,
831                                                      found))
832        return d
833
834
835    def build_manifest(self):
836        """Return a Monitor, with a ['status'] that will be a list of (path,
837        cap) tuples, for all nodes (directories and files) reachable from
838        this one."""
839        walker = ManifestWalker(self)
840        return self.deep_traverse(walker)
841
842    def start_deep_stats(self):
843        # Since deep_traverse tracks verifier caps, we avoid double-counting
844        # children for which we've got both a write-cap and a read-cap
845        return self.deep_traverse(DeepStats(self))
846
847    def start_deep_check(self, verify=False, add_lease=False):
848        return self.deep_traverse(DeepChecker(self, verify, repair=False, add_lease=add_lease))
849
850    def start_deep_check_and_repair(self, verify=False, add_lease=False):
851        return self.deep_traverse(DeepChecker(self, verify, repair=True, add_lease=add_lease))
852
853
854class ManifestWalker(DeepStats):
855    def __init__(self, origin):
856        DeepStats.__init__(self, origin)
857        self.manifest = []
858        self.storage_index_strings = set()
859        self.verifycaps = set()
860
861    def add_node(self, node, path):
862        self.manifest.append( (tuple(path), node.get_uri()) )
863        si = node.get_storage_index()
864        if si:
865            self.storage_index_strings.add(base32.b2a(si))
866        v = node.get_verify_cap()
867        if v:
868            self.verifycaps.add(v.to_string())
869        return DeepStats.add_node(self, node, path)
870
871    def get_results(self):
872        stats = DeepStats.get_results(self)
873        return {"manifest": self.manifest,
874                "verifycaps": self.verifycaps,
875                "storage-index": self.storage_index_strings,
876                "stats": stats,
877                }
878
879
880class DeepChecker(object):
881    def __init__(self, root, verify, repair, add_lease):
882        root_si = root.get_storage_index()
883        if root_si:
884            root_si_base32 = base32.b2a(root_si)
885        else:
886            root_si_base32 = ""
887        self._lp = log.msg(format="deep-check starting (%(si)s),"
888                           " verify=%(verify)s, repair=%(repair)s",
889                           si=root_si_base32, verify=verify, repair=repair)
890        self._verify = verify
891        self._repair = repair
892        self._add_lease = add_lease
893        if repair:
894            self._results = DeepCheckAndRepairResults(root_si)
895        else:
896            self._results = DeepCheckResults(root_si)
897        self._stats = DeepStats(root)
898
899    def set_monitor(self, monitor):
900        self.monitor = monitor
901        monitor.set_status(self._results)
902
903    def add_node(self, node, childpath):
904        if self._repair:
905            d = node.check_and_repair(self.monitor, self._verify, self._add_lease)
906            d.addCallback(self._results.add_check_and_repair, childpath)
907        else:
908            d = node.check(self.monitor, self._verify, self._add_lease)
909            d.addCallback(self._results.add_check, childpath)
910        d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
911        return d
912
913    def enter_directory(self, parent, children):
914        return self._stats.enter_directory(parent, children)
915
916    def finish(self):
917        log.msg("deep-check done", parent=self._lp)
918        self._results.update_stats(self._stats.get_results())
919        return self._results
920
921
922# use client.create_dirnode() to make one of these
Note: See TracBrowser for help on using the repository browser.