source: trunk/src/allmydata/test/cli_node_api.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 5.7 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5__all__ = [
6    "CLINodeAPI",
7    "Expect",
8    "on_stdout",
9    "on_stdout_and_stderr",
10    "on_different",
11]
12
13import os
14import sys
15from errno import ENOENT
16
17import attr
18
19from eliot import (
20    log_call,
21)
22
23from twisted.internet.error import (
24    ProcessTerminated,
25    ProcessExitedAlready,
26)
27from twisted.internet.interfaces import (
28    IProcessProtocol,
29)
30from twisted.python.log import (
31    msg,
32)
33from twisted.python.filepath import (
34    FilePath,
35)
36from twisted.internet.protocol import (
37    Protocol,
38    ProcessProtocol,
39)
40from twisted.internet.defer import (
41    Deferred,
42    succeed,
43)
44from twisted.internet.task import (
45    deferLater,
46)
47from ..client import (
48    _Client,
49)
50from ..util.eliotutil import (
51    inline_callbacks,
52    log_call_deferred,
53)
54
55class Expect(Protocol, object):
56    def __init__(self):
57        self._expectations = []
58
59    def get_buffered_output(self):
60        return self._buffer
61
62    def expect(self, expectation):
63        if expectation in self._buffer:
64            return succeed(None)
65        d = Deferred()
66        self._expectations.append((expectation, d))
67        return d
68
69    def connectionMade(self):
70        self._buffer = b""
71
72    def dataReceived(self, data):
73        self._buffer += data
74        for i in range(len(self._expectations) - 1, -1, -1):
75            expectation, d = self._expectations[i]
76            if expectation in self._buffer:
77                del self._expectations[i]
78                d.callback(None)
79
80    def connectionLost(self, reason):
81        for ignored, d in self._expectations:
82            d.errback(reason)
83
84
85class _ProcessProtocolAdapter(ProcessProtocol, object):
86    def __init__(self, fds):
87        self._fds = fds
88
89    def connectionMade(self):
90        for proto in list(self._fds.values()):
91            proto.makeConnection(self.transport)
92
93    def childDataReceived(self, childFD, data):
94        try:
95            proto = self._fds[childFD]
96        except KeyError:
97            msg(format="Received unhandled output on %(fd)s: %(output)s",
98                fd=childFD,
99                output=data,
100            )
101        else:
102            proto.dataReceived(data)
103
104    def processEnded(self, reason):
105        notified = set()
106        for proto in list(self._fds.values()):
107            if proto not in notified:
108                proto.connectionLost(reason)
109                notified.add(proto)
110
111
112def on_stdout(protocol):
113    return _ProcessProtocolAdapter({1: protocol})
114
115def on_stdout_and_stderr(protocol):
116    return _ProcessProtocolAdapter({1: protocol, 2: protocol})
117
118def on_different(fd_mapping):
119    return _ProcessProtocolAdapter(fd_mapping)
120
121@attr.s
122class CLINodeAPI(object):
123    reactor = attr.ib()
124    basedir = attr.ib(type=FilePath)
125    process = attr.ib(default=None)
126
127    @property
128    def twistd_pid_file(self):
129        return self.basedir.child(u"running.process")
130
131    @property
132    def node_url_file(self):
133        return self.basedir.child(u"node.url")
134
135    @property
136    def storage_furl_file(self):
137        return self.basedir.child(u"private").child(u"storage.furl")
138
139    @property
140    def introducer_furl_file(self):
141        return self.basedir.child(u"private").child(u"introducer.furl")
142
143    @property
144    def config_file(self):
145        return self.basedir.child(u"tahoe.cfg")
146
147    @property
148    def exit_trigger_file(self):
149        return self.basedir.child(_Client.EXIT_TRIGGER_FILE)
150
151    def _execute(self, process_protocol, argv):
152        exe = sys.executable
153        argv = [
154            exe,
155            "-b",
156            u"-m",
157            u"allmydata.scripts.runner",
158        ] + argv
159        msg(format="Executing %(argv)s",
160            argv=argv,
161        )
162        return self.reactor.spawnProcess(
163            processProtocol=process_protocol,
164            executable=exe,
165            args=argv,
166            env=os.environ,
167        )
168
169    @log_call(action_type="test:cli-api:run", include_args=["extra_tahoe_args"])
170    def run(self, protocol, extra_tahoe_args=()):
171        """
172        Start the node running.
173
174        :param IProcessProtocol protocol: This protocol will be hooked up to
175            the node process and can handle output or generate input.
176        """
177        if not IProcessProtocol.providedBy(protocol):
178            raise TypeError("run requires process protocol, got {}".format(protocol))
179        self.process = self._execute(
180            protocol,
181            list(extra_tahoe_args) + [u"run", self.basedir.asTextMode().path],
182        )
183        # Don't let the process run away forever.
184        try:
185            self.active()
186        except OSError as e:
187            if ENOENT != e.errno:
188                raise
189
190    @log_call_deferred(action_type="test:cli-api:stop")
191    def stop(self):
192        return self.stop_and_wait()
193
194    @log_call_deferred(action_type="test:cli-api:stop-and-wait")
195    @inline_callbacks
196    def stop_and_wait(self):
197        if self.process is not None:
198            while True:
199                try:
200                    self.process.signalProcess("TERM")
201                except ProcessExitedAlready:
202                    break
203                else:
204                    yield deferLater(self.reactor, 0.1, lambda: None)
205
206    def active(self):
207        # By writing this file, we get two minutes before the client will
208        # exit. This ensures that even if the 'stop' command doesn't work (and
209        # the test fails), the client should still terminate.
210        self.exit_trigger_file.touch()
211
212    def _check_cleanup_reason(self, reason):
213        # Let it fail because the process has already exited.
214        reason.trap(ProcessTerminated)
215        return None
216
217    def cleanup(self):
218        stopping = self.stop_and_wait()
219        stopping.addErrback(self._check_cleanup_reason)
220        return stopping
Note: See TracBrowser for help on using the repository browser.