1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | __all__ = [ |
---|
6 | "CLINodeAPI", |
---|
7 | "Expect", |
---|
8 | "on_stdout", |
---|
9 | "on_stdout_and_stderr", |
---|
10 | "on_different", |
---|
11 | ] |
---|
12 | |
---|
13 | import os |
---|
14 | import sys |
---|
15 | from errno import ENOENT |
---|
16 | |
---|
17 | import attr |
---|
18 | |
---|
19 | from eliot import ( |
---|
20 | log_call, |
---|
21 | ) |
---|
22 | |
---|
23 | from twisted.internet.error import ( |
---|
24 | ProcessTerminated, |
---|
25 | ProcessExitedAlready, |
---|
26 | ) |
---|
27 | from twisted.internet.interfaces import ( |
---|
28 | IProcessProtocol, |
---|
29 | ) |
---|
30 | from twisted.python.log import ( |
---|
31 | msg, |
---|
32 | ) |
---|
33 | from twisted.python.filepath import ( |
---|
34 | FilePath, |
---|
35 | ) |
---|
36 | from twisted.internet.protocol import ( |
---|
37 | Protocol, |
---|
38 | ProcessProtocol, |
---|
39 | ) |
---|
40 | from twisted.internet.defer import ( |
---|
41 | Deferred, |
---|
42 | succeed, |
---|
43 | ) |
---|
44 | from twisted.internet.task import ( |
---|
45 | deferLater, |
---|
46 | ) |
---|
47 | from ..client import ( |
---|
48 | _Client, |
---|
49 | ) |
---|
50 | from ..util.eliotutil import ( |
---|
51 | inline_callbacks, |
---|
52 | log_call_deferred, |
---|
53 | ) |
---|
54 | |
---|
55 | class Expect(Protocol, object): |
---|
56 | def __init__(self): |
---|
57 | self._expectations = [] |
---|
58 | |
---|
59 | def get_buffered_output(self): |
---|
60 | return self._buffer |
---|
61 | |
---|
62 | def expect(self, expectation): |
---|
63 | if expectation in self._buffer: |
---|
64 | return succeed(None) |
---|
65 | d = Deferred() |
---|
66 | self._expectations.append((expectation, d)) |
---|
67 | return d |
---|
68 | |
---|
69 | def connectionMade(self): |
---|
70 | self._buffer = b"" |
---|
71 | |
---|
72 | def dataReceived(self, data): |
---|
73 | self._buffer += data |
---|
74 | for i in range(len(self._expectations) - 1, -1, -1): |
---|
75 | expectation, d = self._expectations[i] |
---|
76 | if expectation in self._buffer: |
---|
77 | del self._expectations[i] |
---|
78 | d.callback(None) |
---|
79 | |
---|
80 | def connectionLost(self, reason): |
---|
81 | for ignored, d in self._expectations: |
---|
82 | d.errback(reason) |
---|
83 | |
---|
84 | |
---|
85 | class _ProcessProtocolAdapter(ProcessProtocol, object): |
---|
86 | def __init__(self, fds): |
---|
87 | self._fds = fds |
---|
88 | |
---|
89 | def connectionMade(self): |
---|
90 | for proto in list(self._fds.values()): |
---|
91 | proto.makeConnection(self.transport) |
---|
92 | |
---|
93 | def childDataReceived(self, childFD, data): |
---|
94 | try: |
---|
95 | proto = self._fds[childFD] |
---|
96 | except KeyError: |
---|
97 | msg(format="Received unhandled output on %(fd)s: %(output)s", |
---|
98 | fd=childFD, |
---|
99 | output=data, |
---|
100 | ) |
---|
101 | else: |
---|
102 | proto.dataReceived(data) |
---|
103 | |
---|
104 | def processEnded(self, reason): |
---|
105 | notified = set() |
---|
106 | for proto in list(self._fds.values()): |
---|
107 | if proto not in notified: |
---|
108 | proto.connectionLost(reason) |
---|
109 | notified.add(proto) |
---|
110 | |
---|
111 | |
---|
112 | def on_stdout(protocol): |
---|
113 | return _ProcessProtocolAdapter({1: protocol}) |
---|
114 | |
---|
115 | def on_stdout_and_stderr(protocol): |
---|
116 | return _ProcessProtocolAdapter({1: protocol, 2: protocol}) |
---|
117 | |
---|
118 | def on_different(fd_mapping): |
---|
119 | return _ProcessProtocolAdapter(fd_mapping) |
---|
120 | |
---|
121 | @attr.s |
---|
122 | class CLINodeAPI(object): |
---|
123 | reactor = attr.ib() |
---|
124 | basedir = attr.ib(type=FilePath) |
---|
125 | process = attr.ib(default=None) |
---|
126 | |
---|
127 | @property |
---|
128 | def twistd_pid_file(self): |
---|
129 | return self.basedir.child(u"running.process") |
---|
130 | |
---|
131 | @property |
---|
132 | def node_url_file(self): |
---|
133 | return self.basedir.child(u"node.url") |
---|
134 | |
---|
135 | @property |
---|
136 | def storage_furl_file(self): |
---|
137 | return self.basedir.child(u"private").child(u"storage.furl") |
---|
138 | |
---|
139 | @property |
---|
140 | def introducer_furl_file(self): |
---|
141 | return self.basedir.child(u"private").child(u"introducer.furl") |
---|
142 | |
---|
143 | @property |
---|
144 | def config_file(self): |
---|
145 | return self.basedir.child(u"tahoe.cfg") |
---|
146 | |
---|
147 | @property |
---|
148 | def exit_trigger_file(self): |
---|
149 | return self.basedir.child(_Client.EXIT_TRIGGER_FILE) |
---|
150 | |
---|
151 | def _execute(self, process_protocol, argv): |
---|
152 | exe = sys.executable |
---|
153 | argv = [ |
---|
154 | exe, |
---|
155 | "-b", |
---|
156 | u"-m", |
---|
157 | u"allmydata.scripts.runner", |
---|
158 | ] + argv |
---|
159 | msg(format="Executing %(argv)s", |
---|
160 | argv=argv, |
---|
161 | ) |
---|
162 | return self.reactor.spawnProcess( |
---|
163 | processProtocol=process_protocol, |
---|
164 | executable=exe, |
---|
165 | args=argv, |
---|
166 | env=os.environ, |
---|
167 | ) |
---|
168 | |
---|
169 | @log_call(action_type="test:cli-api:run", include_args=["extra_tahoe_args"]) |
---|
170 | def run(self, protocol, extra_tahoe_args=()): |
---|
171 | """ |
---|
172 | Start the node running. |
---|
173 | |
---|
174 | :param IProcessProtocol protocol: This protocol will be hooked up to |
---|
175 | the node process and can handle output or generate input. |
---|
176 | """ |
---|
177 | if not IProcessProtocol.providedBy(protocol): |
---|
178 | raise TypeError("run requires process protocol, got {}".format(protocol)) |
---|
179 | self.process = self._execute( |
---|
180 | protocol, |
---|
181 | list(extra_tahoe_args) + [u"run", self.basedir.asTextMode().path], |
---|
182 | ) |
---|
183 | # Don't let the process run away forever. |
---|
184 | try: |
---|
185 | self.active() |
---|
186 | except OSError as e: |
---|
187 | if ENOENT != e.errno: |
---|
188 | raise |
---|
189 | |
---|
190 | @log_call_deferred(action_type="test:cli-api:stop") |
---|
191 | def stop(self): |
---|
192 | return self.stop_and_wait() |
---|
193 | |
---|
194 | @log_call_deferred(action_type="test:cli-api:stop-and-wait") |
---|
195 | @inline_callbacks |
---|
196 | def stop_and_wait(self): |
---|
197 | if self.process is not None: |
---|
198 | while True: |
---|
199 | try: |
---|
200 | self.process.signalProcess("TERM") |
---|
201 | except ProcessExitedAlready: |
---|
202 | break |
---|
203 | else: |
---|
204 | yield deferLater(self.reactor, 0.1, lambda: None) |
---|
205 | |
---|
206 | def active(self): |
---|
207 | # By writing this file, we get two minutes before the client will |
---|
208 | # exit. This ensures that even if the 'stop' command doesn't work (and |
---|
209 | # the test fails), the client should still terminate. |
---|
210 | self.exit_trigger_file.touch() |
---|
211 | |
---|
212 | def _check_cleanup_reason(self, reason): |
---|
213 | # Let it fail because the process has already exited. |
---|
214 | reason.trap(ProcessTerminated) |
---|
215 | return None |
---|
216 | |
---|
217 | def cleanup(self): |
---|
218 | stopping = self.stop_and_wait() |
---|
219 | stopping.addErrback(self._check_cleanup_reason) |
---|
220 | return stopping |
---|