source: trunk/src/allmydata/test/test_introducer.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 45.2 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from six import ensure_binary, ensure_text
6
7import os, re, itertools
8from base64 import b32decode
9import json
10from operator import (
11    setitem,
12)
13from functools import (
14    partial,
15)
16
17from testtools.matchers import (
18    Is,
19)
20
21from twisted.internet import defer, address
22from twisted.python import log
23from twisted.python.filepath import FilePath
24from twisted.web.template import flattenString
25
26from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
27from twisted.application import service
28from allmydata.crypto import ed25519
29from allmydata.crypto.util import remove_prefix
30from allmydata.crypto.error import BadSignature
31from allmydata.interfaces import InsufficientVersionError
32from allmydata.introducer.client import IntroducerClient
33from allmydata.introducer.server import IntroducerService, FurlFileConflictError
34from allmydata.introducer.common import get_tubid_string_from_ann, \
35     get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
36     UnknownKeyError
37from allmydata.node import (
38    create_node_dir,
39    read_config,
40)
41# the "new way" to create introducer node instance
42from allmydata.introducer.server import create_introducer
43from allmydata.web import introweb
44from allmydata.client import (
45    create_client,
46    create_introducer_clients,
47)
48from allmydata.util import pollmixin, idlib, fileutil, yamlutil
49from allmydata.util.iputil import (
50    listenOnUnused,
51)
52from allmydata.scripts.common import (
53    write_introducer,
54)
55import allmydata.test.common_util as testutil
56from .common import (
57    SyncTestCase,
58    AsyncTestCase,
59    AsyncBrokenTestCase,
60)
61
62class LoggingMultiService(service.MultiService):
63    def log(self, msg, **kw):
64        log.msg(msg, **kw)
65
66class Node(testutil.SignalMixin, testutil.ReallyEqualMixin, AsyncTestCase):
67
68    def test_backwards_compat_import(self):
69        # for old introducer .tac files
70        from allmydata.introducer import IntroducerNode
71        IntroducerNode  # pyflakes
72
73    @defer.inlineCallbacks
74    def test_create(self):
75        """
76        A brand new introducer creates its config dir
77        """
78        basedir = "introducer.IntroducerNode.test_create"
79        yield create_introducer(basedir)
80        self.assertTrue(os.path.exists(basedir))
81
82    def test_introducer_clients_unloadable(self):
83        """
84        ``create_introducer_clients`` raises ``EnvironmentError`` if
85        ``introducers.yaml`` exists but we can't read it.
86        """
87        basedir = u"introducer.IntroducerNode.test_introducer_clients_unloadable"
88        os.mkdir(basedir)
89        os.mkdir(os.path.join(basedir, u"private"))
90        yaml_fname = os.path.join(basedir, u"private", u"introducers.yaml")
91        with open(yaml_fname, 'w') as f:
92            f.write(u'---\n')
93        os.chmod(yaml_fname, 0o000)
94        self.addCleanup(lambda: os.chmod(yaml_fname, 0o700))
95
96        config = read_config(basedir, "portnum")
97        with self.assertRaises(EnvironmentError):
98            create_introducer_clients(config, Tub())
99
100    @defer.inlineCallbacks
101    def test_furl(self):
102        basedir = "introducer.IntroducerNode.test_furl"
103        create_node_dir(basedir, "testing")
104        public_fn = os.path.join(basedir, "introducer.furl")
105        private_fn = os.path.join(basedir, "private", "introducer.furl")
106
107        q1 = yield create_introducer(basedir)
108        del q1
109        # new nodes create unguessable furls in private/introducer.furl
110        ifurl = fileutil.read(private_fn, mode="r")
111        self.failUnless(ifurl)
112        ifurl = ifurl.strip()
113        self.failIf(ifurl.endswith("/introducer"), ifurl)
114
115        # old nodes created guessable furls in BASEDIR/introducer.furl
116        guessable = ifurl[:ifurl.rfind("/")] + "/introducer"
117        fileutil.write(public_fn, guessable+"\n", mode="w") # text
118
119        # if we see both files, throw an error
120        with self.assertRaises(FurlFileConflictError):
121            yield create_introducer(basedir)
122
123        # when we see only the public one, move it to private/ and use
124        # the existing furl instead of creating a new one
125        os.unlink(private_fn)
126
127        q2 = yield create_introducer(basedir)
128        del q2
129        self.failIf(os.path.exists(public_fn))
130        ifurl2 = fileutil.read(private_fn, mode="r")
131        self.failUnless(ifurl2)
132        self.failUnlessEqual(ifurl2.strip(), guessable)
133
134    @defer.inlineCallbacks
135    def test_web_static(self):
136        basedir = u"introducer.Node.test_web_static"
137        create_node_dir(basedir, "testing")
138        fileutil.write(os.path.join(basedir, "tahoe.cfg"),
139                       "[node]\n" +
140                       "web.port = tcp:0:interface=127.0.0.1\n" +
141                       "web.static = relative\n")
142        c = yield create_introducer(basedir)
143        w = c.getServiceNamed("webish")
144        abs_basedir = fileutil.abspath_expanduser_unicode(basedir)
145        expected = fileutil.abspath_expanduser_unicode(u"relative", abs_basedir)
146        self.failUnlessReallyEqual(w.staticdir, expected)
147
148
149class ServiceMixin(object):
150    def setUp(self):
151        self.parent = LoggingMultiService()
152        self.parent.startService()
153        return super(ServiceMixin, self).setUp()
154
155    def tearDown(self):
156        log.msg("TestIntroducer.tearDown")
157        d = defer.maybeDeferred(super(ServiceMixin, self).tearDown)
158        d.addCallback(lambda res: self.parent.stopService())
159        d.addCallback(flushEventualQueue)
160        return d
161
162class Introducer(ServiceMixin, AsyncTestCase):
163    def test_create(self):
164        ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
165                              "my_version", "oldest_version", fakeseq,
166                              FilePath(self.mktemp()))
167        self.failUnless(isinstance(ic, IntroducerClient))
168
169    def test_listen(self):
170        i = IntroducerService()
171        i.setServiceParent(self.parent)
172
173
174def fakeseq():
175    return 1, "nonce"
176
177seqnum_counter = itertools.count(1)
178def realseq():
179    return next(seqnum_counter), str(os.randint(1,100000))
180
181def make_ann(furl):
182    ann = { "anonymous-storage-FURL": furl,
183            "permutation-seed-base32": get_tubid_string(furl) }
184    return ann
185
186def make_ann_t(ic, furl, privkey, seqnum):
187    assert privkey
188    ann_d = ic.create_announcement_dict("storage", make_ann(furl))
189    ann_d["seqnum"] = seqnum
190    ann_d["nonce"] = "nonce"
191    ann_t = sign_to_foolscap(ann_d, privkey)
192    return ann_t
193
194class Client(AsyncTestCase):
195    def test_duplicate_receive_v2(self):
196        ic1 = IntroducerClient(None,
197                               "introducer.furl", u"my_nickname",
198                               "ver23", "oldest_version", fakeseq,
199                               FilePath(self.mktemp()))
200        # we use a second client just to create a different-looking
201        # announcement
202        ic2 = IntroducerClient(None,
203                               "introducer.furl", u"my_nickname",
204                               "ver24","oldest_version",fakeseq,
205                               FilePath(self.mktemp()))
206        announcements = []
207        def _received(key_s, ann):
208            announcements.append( (key_s, ann) )
209        ic1.subscribe_to("storage", _received)
210        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
211        furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
212        furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
213
214        private_key, public_key = ed25519.create_signing_keypair()
215        public_key_str = ed25519.string_from_verifying_key(public_key)
216        pubkey_s = remove_prefix(public_key_str, b"pub-")
217
218        # ann1: ic1, furl1
219        # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
220        # ann1b: ic2, furl1
221        # ann2: ic2, furl2
222
223        self.ann1 = make_ann_t(ic1, furl1, private_key, seqnum=10)
224        self.ann1old = make_ann_t(ic1, furl1, private_key, seqnum=9)
225        self.ann1noseqnum = make_ann_t(ic1, furl1, private_key, seqnum=None)
226        self.ann1b = make_ann_t(ic2, furl1, private_key, seqnum=11)
227        self.ann1a = make_ann_t(ic1, furl1a, private_key, seqnum=12)
228        self.ann2 = make_ann_t(ic2, furl2, private_key, seqnum=13)
229
230        ic1.remote_announce_v2([self.ann1]) # queues eventual-send
231        d = fireEventually()
232        def _then1(ign):
233            self.failUnlessEqual(len(announcements), 1)
234            key_s,ann = announcements[0]
235            self.failUnlessEqual(key_s, pubkey_s)
236            self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
237            self.failUnlessEqual(ann["my-version"], "ver23")
238        d.addCallback(_then1)
239
240        # now send a duplicate announcement. This should not fire the
241        # subscriber
242        d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
243        d.addCallback(fireEventually)
244        def _then2(ign):
245            self.failUnlessEqual(len(announcements), 1)
246        d.addCallback(_then2)
247
248        # an older announcement shouldn't fire the subscriber either
249        d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1old]))
250        d.addCallback(fireEventually)
251        def _then2a(ign):
252            self.failUnlessEqual(len(announcements), 1)
253        d.addCallback(_then2a)
254
255        # announcement with no seqnum cannot replace one with-seqnum
256        d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1noseqnum]))
257        d.addCallback(fireEventually)
258        def _then2b(ign):
259            self.failUnlessEqual(len(announcements), 1)
260        d.addCallback(_then2b)
261
262        # and a replacement announcement: same FURL, new other stuff. The
263        # subscriber *should* be fired.
264        d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
265        d.addCallback(fireEventually)
266        def _then3(ign):
267            self.failUnlessEqual(len(announcements), 2)
268            key_s,ann = announcements[-1]
269            self.failUnlessEqual(key_s, pubkey_s)
270            self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
271            self.failUnlessEqual(ann["my-version"], "ver24")
272        d.addCallback(_then3)
273
274        # and a replacement announcement with a different FURL (it uses
275        # different connection hints)
276        d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
277        d.addCallback(fireEventually)
278        def _then4(ign):
279            self.failUnlessEqual(len(announcements), 3)
280            key_s,ann = announcements[-1]
281            self.failUnlessEqual(key_s, pubkey_s)
282            self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
283            self.failUnlessEqual(ann["my-version"], "ver23")
284        d.addCallback(_then4)
285
286        # now add a new subscription, which should be called with the
287        # backlog. The introducer only records one announcement per index, so
288        # the backlog will only have the latest message.
289        announcements2 = []
290        def _received2(key_s, ann):
291            announcements2.append( (key_s, ann) )
292        d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
293        d.addCallback(fireEventually)
294        def _then5(ign):
295            self.failUnlessEqual(len(announcements2), 1)
296            key_s,ann = announcements2[-1]
297            self.failUnlessEqual(key_s, pubkey_s)
298            self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
299            self.failUnlessEqual(ann["my-version"], "ver23")
300        d.addCallback(_then5)
301        return d
302
303class Server(AsyncTestCase):
304    def test_duplicate(self):
305        i = IntroducerService()
306        ic1 = IntroducerClient(None,
307                               "introducer.furl", u"my_nickname",
308                               "ver23", "oldest_version", realseq,
309                               FilePath(self.mktemp()))
310        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
311
312        private_key, _ = ed25519.create_signing_keypair()
313
314        ann1 = make_ann_t(ic1, furl1, private_key, seqnum=10)
315        ann1_old = make_ann_t(ic1, furl1, private_key, seqnum=9)
316        ann1_new = make_ann_t(ic1, furl1, private_key, seqnum=11)
317        ann1_noseqnum = make_ann_t(ic1, furl1, private_key, seqnum=None)
318        ann1_badseqnum = make_ann_t(ic1, furl1, private_key, seqnum="not an int")
319
320        i.remote_publish_v2(ann1, None)
321        all = i.get_announcements()
322        self.failUnlessEqual(len(all), 1)
323        self.failUnlessEqual(all[0].announcement["seqnum"], 10)
324        self.failUnlessEqual(i._debug_counts["inbound_message"], 1)
325        self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 0)
326        self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
327        self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
328        self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
329
330        i.remote_publish_v2(ann1, None)
331        all = i.get_announcements()
332        self.failUnlessEqual(len(all), 1)
333        self.failUnlessEqual(all[0].announcement["seqnum"], 10)
334        self.failUnlessEqual(i._debug_counts["inbound_message"], 2)
335        self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
336        self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
337        self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
338        self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
339
340        i.remote_publish_v2(ann1_old, None)
341        all = i.get_announcements()
342        self.failUnlessEqual(len(all), 1)
343        self.failUnlessEqual(all[0].announcement["seqnum"], 10)
344        self.failUnlessEqual(i._debug_counts["inbound_message"], 3)
345        self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
346        self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
347        self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
348        self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
349
350        i.remote_publish_v2(ann1_new, None)
351        all = i.get_announcements()
352        self.failUnlessEqual(len(all), 1)
353        self.failUnlessEqual(all[0].announcement["seqnum"], 11)
354        self.failUnlessEqual(i._debug_counts["inbound_message"], 4)
355        self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
356        self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
357        self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
358        self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
359
360        i.remote_publish_v2(ann1_noseqnum, None)
361        all = i.get_announcements()
362        self.failUnlessEqual(len(all), 1)
363        self.failUnlessEqual(all[0].announcement["seqnum"], 11)
364        self.failUnlessEqual(i._debug_counts["inbound_message"], 5)
365        self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
366        self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 1)
367        self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
368        self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
369
370        i.remote_publish_v2(ann1_badseqnum, None)
371        all = i.get_announcements()
372        self.failUnlessEqual(len(all), 1)
373        self.failUnlessEqual(all[0].announcement["seqnum"], 11)
374        self.failUnlessEqual(i._debug_counts["inbound_message"], 6)
375        self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
376        self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 2)
377        self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
378        self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
379
380
381NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
382
383class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
384
385    def create_tub(self, portnum=None):
386        tubfile = os.path.join(self.basedir, "tub.pem")
387        self.central_tub = tub = Tub(certFile=tubfile)
388        #tub.setOption("logLocalFailures", True)
389        #tub.setOption("logRemoteFailures", True)
390        tub.setOption("expose-remote-exception-types", False)
391        tub.setServiceParent(self.parent)
392        self.central_portnum = listenOnUnused(tub, portnum)
393
394class Queue(SystemTestMixin, AsyncTestCase):
395    def test_queue_until_connected(self):
396        self.basedir = "introducer/QueueUntilConnected/queued"
397        os.makedirs(self.basedir)
398        self.create_tub()
399        introducer = IntroducerService()
400        introducer.setServiceParent(self.parent)
401        iff = os.path.join(self.basedir, "introducer.furl")
402        ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
403        tub2 = Tub()
404        tub2.setServiceParent(self.parent)
405        c = IntroducerClient(tub2, ifurl,
406                             u"nickname", "version", "oldest", fakeseq,
407                             FilePath(self.mktemp()))
408        furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
409        private_key, _ = ed25519.create_signing_keypair()
410
411        d = introducer.disownServiceParent()
412
413        def _offline(ign):
414            # now that the introducer server is offline, create a client and
415            # publish some messages
416            c.setServiceParent(self.parent) # this starts the reconnector
417            c.publish("storage", make_ann(furl1), private_key)
418
419            introducer.setServiceParent(self.parent) # restart the server
420            # now wait for the messages to be delivered
421            def _got_announcement():
422                return bool(introducer.get_announcements())
423            return self.poll(_got_announcement)
424
425        d.addCallback(_offline)
426
427        def _done(ign):
428            v = introducer.get_announcements()[0]
429            furl = v.announcement["anonymous-storage-FURL"]
430            self.failUnlessEqual(furl, furl1)
431        d.addCallback(_done)
432
433        # now let the ack get back
434        def _wait_until_idle(ign):
435            def _idle():
436                if c._debug_outstanding:
437                    return False
438                if introducer._debug_outstanding:
439                    return False
440                return True
441            return self.poll(_idle)
442
443        d.addCallback(_wait_until_idle)
444        return d
445
446
447class SystemTest(SystemTestMixin, AsyncTestCase):
448
449    def do_system_test(self):
450        self.create_tub()
451        introducer = IntroducerService()
452        introducer.setServiceParent(self.parent)
453        iff = os.path.join(self.basedir, "introducer.furl")
454        tub = self.central_tub
455        ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
456        self.introducer_furl = ifurl
457
458        # we have 5 clients who publish themselves as storage servers, and a
459        # sixth which does which not. All 6 clients subscriber to hear about
460        # storage. When the connections are fully established, all six nodes
461        # should have 5 connections each.
462        NUM_STORAGE = 5
463        NUM_CLIENTS = 6
464
465        clients = []
466        tubs = {}
467        received_announcements = {}
468        subscribing_clients = []
469        publishing_clients = []
470        printable_serverids = {}
471        self.the_introducer = introducer
472        privkeys = {}
473        pubkeys = {}
474        expected_announcements = [0 for c in range(NUM_CLIENTS)]
475
476        for i in range(NUM_CLIENTS):
477            tub = Tub()
478            #tub.setOption("logLocalFailures", True)
479            #tub.setOption("logRemoteFailures", True)
480            tub.setOption("expose-remote-exception-types", False)
481            tub.setServiceParent(self.parent)
482            listenOnUnused(tub)
483            log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
484            c = IntroducerClient(tub, self.introducer_furl,
485                                 NICKNAME % str(i),
486                                 "version", "oldest",
487                                 fakeseq,
488                                 FilePath(self.mktemp()))
489            received_announcements[c] = {}
490            def got(key_s_or_tubid, ann, announcements):
491                index = key_s_or_tubid or get_tubid_string_from_ann(ann)
492                announcements[index] = ann
493            c.subscribe_to("storage", got, received_announcements[c])
494            subscribing_clients.append(c)
495            expected_announcements[i] += 1 # all expect a 'storage' announcement
496
497            node_furl = tub.registerReference(Referenceable())
498            private_key, public_key = ed25519.create_signing_keypair()
499            public_key_str = ed25519.string_from_verifying_key(public_key)
500            privkeys[i] = private_key
501            pubkeys[i] = public_key_str
502
503            if i < NUM_STORAGE:
504                # sign all announcements
505                c.publish("storage", make_ann(node_furl), private_key)
506                printable_serverids[i] = remove_prefix(public_key_str, b"pub-")
507                publishing_clients.append(c)
508            else:
509                # the last one does not publish anything
510                pass
511
512            if i == 2:
513                # also publish something that nobody cares about
514                boring_furl = tub.registerReference(Referenceable())
515                c.publish("boring", make_ann(boring_furl), private_key)
516
517            c.setServiceParent(self.parent)
518            clients.append(c)
519            tubs[c] = tub
520
521        def _wait_for_connected(ign):
522            def _connected():
523                for c in clients:
524                    if not c.connected_to_introducer():
525                        return False
526                return True
527            return self.poll(_connected)
528
529        # we watch the clients to determine when the system has settled down.
530        # Then we can look inside the server to assert things about its
531        # state.
532
533        def _wait_for_expected_announcements(ign):
534            def _got_expected_announcements():
535                for i,c in enumerate(subscribing_clients):
536                    if len(received_announcements[c]) < expected_announcements[i]:
537                        return False
538                return True
539            return self.poll(_got_expected_announcements)
540
541        # before shutting down any Tub, we'd like to know that there are no
542        # messages outstanding
543
544        def _wait_until_idle(ign):
545            def _idle():
546                for c in subscribing_clients + publishing_clients:
547                    if c._debug_outstanding:
548                        return False
549                if self.the_introducer._debug_outstanding:
550                    return False
551                return True
552            return self.poll(_idle)
553
554        d = defer.succeed(None)
555        d.addCallback(_wait_for_connected)
556        d.addCallback(_wait_for_expected_announcements)
557        d.addCallback(_wait_until_idle)
558
559        def _check1(res):
560            log.msg("doing _check1")
561            dc = self.the_introducer._debug_counts
562            # each storage server publishes a record. There is also one
563            # "boring"
564            self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+1)
565            self.failUnlessEqual(dc["inbound_duplicate"], 0)
566            self.failUnlessEqual(dc["inbound_update"], 0)
567            self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
568            # the number of outbound messages is tricky.. I think it depends
569            # upon a race between the publish and the subscribe messages.
570            self.failUnless(dc["outbound_message"] > 0)
571            # each client subscribes to "storage", and each server publishes
572            self.failUnlessEqual(dc["outbound_announcements"],
573                                 NUM_STORAGE*NUM_CLIENTS)
574
575            for c in subscribing_clients:
576                cdc = c._debug_counts
577                self.failUnless(cdc["inbound_message"])
578                self.failUnlessEqual(cdc["inbound_announcement"],
579                                     NUM_STORAGE)
580                self.failUnlessEqual(cdc["wrong_service"], 0)
581                self.failUnlessEqual(cdc["duplicate_announcement"], 0)
582                self.failUnlessEqual(cdc["update"], 0)
583                self.failUnlessEqual(cdc["new_announcement"],
584                                     NUM_STORAGE)
585                anns = received_announcements[c]
586                self.failUnlessEqual(len(anns), NUM_STORAGE)
587
588                serverid0 = printable_serverids[0]
589                ann = anns[serverid0]
590                nick = ann["nickname"]
591                self.assertIsInstance(nick, str)
592                self.failUnlessEqual(nick, NICKNAME % "0")
593            for c in publishing_clients:
594                cdc = c._debug_counts
595                expected = 1
596                if c in [clients[2], # boring
597                         ]:
598                    expected = 2
599                self.failUnlessEqual(cdc["outbound_message"], expected)
600            # now check the web status, make sure it renders without error
601            ir = introweb.IntroducerRoot(self.parent)
602            self.parent.nodeid = b"NODEID"
603            log.msg("_check1 done")
604            return flattenString(None, ir._create_element())
605        d.addCallback(_check1)
606
607        def _check2(flattened_bytes):
608            text = flattened_bytes.decode("utf-8")
609            self.assertIn(NICKNAME % "0", text) # a v2 client
610            self.assertIn(NICKNAME % "1", text) # another v2 client
611            for i in range(NUM_STORAGE):
612                self.assertIn(ensure_text(printable_serverids[i]), text,
613                                  (i,printable_serverids[i],text))
614                # make sure there isn't a double-base32ed string too
615                self.assertNotIn(idlib.nodeid_b2a(printable_serverids[i]), text,
616                              (i,printable_serverids[i],text))
617            log.msg("_check2 done")
618        d.addCallback(_check2)
619
620        # force an introducer reconnect, by shutting down the Tub it's using
621        # and starting a new Tub (with the old introducer). Everybody should
622        # reconnect and republish, but the introducer should ignore the
623        # republishes as duplicates. However, because the server doesn't know
624        # what each client does and does not know, it will send them a copy
625        # of the current announcement table anyway.
626
627        d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
628        d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
629
630        def _wait_for_introducer_loss(ign):
631            def _introducer_lost():
632                for c in clients:
633                    if c.connected_to_introducer():
634                        return False
635                return True
636            return self.poll(_introducer_lost)
637        d.addCallback(_wait_for_introducer_loss)
638
639        def _restart_introducer_tub(_ign):
640            log.msg("restarting introducer's Tub")
641            # reset counters
642            for i in range(NUM_CLIENTS):
643                c = subscribing_clients[i]
644                for k in c._debug_counts:
645                    c._debug_counts[k] = 0
646            for k in self.the_introducer._debug_counts:
647                self.the_introducer._debug_counts[k] = 0
648            expected_announcements[i] += 1 # new 'storage' for everyone
649            self.create_tub(self.central_portnum)
650            newfurl = self.central_tub.registerReference(self.the_introducer,
651                                                         furlFile=iff)
652            assert newfurl == self.introducer_furl
653        d.addCallback(_restart_introducer_tub)
654
655        d.addCallback(_wait_for_connected)
656        d.addCallback(_wait_for_expected_announcements)
657        d.addCallback(_wait_until_idle)
658        d.addCallback(lambda _ign: log.msg(" reconnected"))
659
660        # TODO: publish something while the introducer is offline, then
661        # confirm it gets delivered when the connection is reestablished
662        def _check2(res):
663            log.msg("doing _check2")
664            # assert that the introducer sent out new messages, one per
665            # subscriber
666            dc = self.the_introducer._debug_counts
667            self.failUnlessEqual(dc["outbound_announcements"],
668                                 NUM_STORAGE*NUM_CLIENTS)
669            self.failUnless(dc["outbound_message"] > 0)
670            self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
671            for c in subscribing_clients:
672                cdc = c._debug_counts
673                self.failUnlessEqual(cdc["inbound_message"], 1)
674                self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
675                self.failUnlessEqual(cdc["new_announcement"], 0)
676                self.failUnlessEqual(cdc["wrong_service"], 0)
677                self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
678        d.addCallback(_check2)
679
680        # Then force an introducer restart, by shutting down the Tub,
681        # destroying the old introducer, and starting a new Tub+Introducer.
682        # Everybody should reconnect and republish, and the (new) introducer
683        # will distribute the new announcements, but the clients should
684        # ignore the republishes as duplicates.
685
686        d.addCallback(lambda _ign: log.msg("shutting down introducer"))
687        d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
688        d.addCallback(_wait_for_introducer_loss)
689        d.addCallback(lambda _ign: log.msg("introducer lost"))
690
691        def _restart_introducer(_ign):
692            log.msg("restarting introducer")
693            self.create_tub(self.central_portnum)
694            # reset counters
695            for i in range(NUM_CLIENTS):
696                c = subscribing_clients[i]
697                for k in c._debug_counts:
698                    c._debug_counts[k] = 0
699            expected_announcements[i] += 1 # new 'storage' for everyone
700            introducer = IntroducerService()
701            self.the_introducer = introducer
702            newfurl = self.central_tub.registerReference(self.the_introducer,
703                                                         furlFile=iff)
704            assert newfurl == self.introducer_furl
705        d.addCallback(_restart_introducer)
706
707        d.addCallback(_wait_for_connected)
708        d.addCallback(_wait_for_expected_announcements)
709        d.addCallback(_wait_until_idle)
710
711        def _check3(res):
712            log.msg("doing _check3")
713            dc = self.the_introducer._debug_counts
714            self.failUnlessEqual(dc["outbound_announcements"],
715                                 NUM_STORAGE*NUM_CLIENTS)
716            self.failUnless(dc["outbound_message"] > 0)
717            self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
718            for c in subscribing_clients:
719                cdc = c._debug_counts
720                self.failUnless(cdc["inbound_message"] > 0)
721                self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
722                self.failUnlessEqual(cdc["new_announcement"], 0)
723                self.failUnlessEqual(cdc["wrong_service"], 0)
724                self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
725
726        d.addCallback(_check3)
727        return d
728
729
730    def test_system_v2_server(self):
731        self.basedir = "introducer/SystemTest/system_v2_server"
732        os.makedirs(self.basedir)
733        return self.do_system_test()
734
735class FakeRemoteReference(object):
736    def notifyOnDisconnect(self, *args, **kwargs): pass
737    def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
738    def getPeer(self): return address.IPv4Address("TCP", "remote.example.com",
739                                                  3456)
740
741class ClientInfo(AsyncTestCase):
742    def test_client_v2(self):
743        introducer = IntroducerService()
744        tub = introducer_furl = None
745        client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
746                                     "my_version", "oldest",
747                                     fakeseq, FilePath(self.mktemp()))
748        #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
749        #ann_s = make_ann_t(client_v2, furl1, None, 10)
750        #introducer.remote_publish_v2(ann_s, Referenceable())
751        subscriber = FakeRemoteReference()
752        introducer.remote_subscribe_v2(subscriber, "storage",
753                                       client_v2._my_subscriber_info)
754        subs = introducer.get_subscribers()
755        self.failUnlessEqual(len(subs), 1)
756        s0 = subs[0]
757        self.failUnlessEqual(s0.service_name, "storage")
758        self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
759        self.failUnlessEqual(s0.version, "my_version")
760
761
762class Announcements(AsyncTestCase):
763    def test_client_v2_signed(self):
764        introducer = IntroducerService()
765        tub = introducer_furl = None
766        client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
767                                     "my_version", "oldest",
768                                     fakeseq, FilePath(self.mktemp()))
769        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
770
771        private_key, public_key = ed25519.create_signing_keypair()
772        public_key_str = remove_prefix(ed25519.string_from_verifying_key(public_key), b"pub-")
773
774        ann_t0 = make_ann_t(client_v2, furl1, private_key, 10)
775        canary0 = Referenceable()
776        introducer.remote_publish_v2(ann_t0, canary0)
777        a = introducer.get_announcements()
778        self.failUnlessEqual(len(a), 1)
779        self.assertThat(a[0].canary, Is(canary0))
780        self.failUnlessEqual(a[0].index, ("storage", public_key_str))
781        self.failUnlessEqual(a[0].nickname, u"nick-v2")
782        self.failUnlessEqual(a[0].service_name, "storage")
783        self.failUnlessEqual(a[0].version, "my_version")
784        self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
785
786    def _load_cache(self, cache_filepath):
787        with cache_filepath.open() as f:
788            return yamlutil.safe_load(f)
789
790    @defer.inlineCallbacks
791    def test_client_cache(self):
792        """
793        Announcements received by an introducer client are written to that
794        introducer client's cache file.
795        """
796        basedir = FilePath("introducer/ClientSeqnums/test_client_cache_1")
797        private = basedir.child("private")
798        private.makedirs()
799        write_introducer(basedir, "default", "nope")
800        cache_filepath = basedir.descendant([
801            "private",
802            "introducer_default_cache.yaml",
803        ])
804
805        # if storage is enabled, the Client will publish its storage server
806        # during startup (although the announcement will wait in a queue
807        # until the introducer connection is established). To avoid getting
808        # confused by this, disable storage.
809        with basedir.child("tahoe.cfg").open("w") as f:
810            f.write(b"[storage]\n")
811            f.write(b"enabled = false\n")
812
813        c = yield create_client(basedir.path)
814        ic = c.introducer_clients[0]
815        private_key, public_key = ed25519.create_signing_keypair()
816        public_key_str = remove_prefix(ed25519.string_from_verifying_key(public_key), b"pub-")
817        furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
818        ann_t = make_ann_t(ic, furl1, private_key, 1)
819
820        ic.got_announcements([ann_t])
821        yield flushEventualQueue()
822
823        # check the cache for the announcement
824        announcements = self._load_cache(cache_filepath)
825        self.failUnlessEqual(len(announcements), 1)
826        self.failUnlessEqual(ensure_binary(announcements[0]['key_s']), public_key_str)
827        ann = announcements[0]["ann"]
828        self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
829        self.failUnlessEqual(ann["seqnum"], 1)
830
831        # a new announcement that replaces the first should replace the
832        # cached entry, not duplicate it
833        furl2 = furl1 + "er"
834        ann_t2 = make_ann_t(ic, furl2, private_key, 2)
835        ic.got_announcements([ann_t2])
836        yield flushEventualQueue()
837        announcements = self._load_cache(cache_filepath)
838        self.failUnlessEqual(len(announcements), 1)
839        self.failUnlessEqual(ensure_binary(announcements[0]['key_s']), public_key_str)
840        ann = announcements[0]["ann"]
841        self.failUnlessEqual(ann["anonymous-storage-FURL"], furl2)
842        self.failUnlessEqual(ann["seqnum"], 2)
843
844        # but a third announcement with a different key should add to the
845        # cache
846        private_key2, public_key2 = ed25519.create_signing_keypair()
847        public_key_str2 = remove_prefix(ed25519.string_from_verifying_key(public_key2), b"pub-")
848        furl3 = "pb://onug64tu@127.0.0.1:456/short"
849        ann_t3 = make_ann_t(ic, furl3, private_key2, 1)
850        ic.got_announcements([ann_t3])
851        yield flushEventualQueue()
852
853        announcements = self._load_cache(cache_filepath)
854        self.failUnlessEqual(len(announcements), 2)
855        self.failUnlessEqual(set([public_key_str, public_key_str2]),
856                             set([ensure_binary(a["key_s"]) for a in announcements]))
857        self.failUnlessEqual(set([furl2, furl3]),
858                             set([a["ann"]["anonymous-storage-FURL"]
859                                  for a in announcements]))
860
861        # test loading
862        yield flushEventualQueue()
863        ic2 = IntroducerClient(None, "introducer.furl", u"my_nickname",
864                               "my_version", "oldest_version", fakeseq,
865                               ic._cache_filepath)
866        announcements = {}
867        def got(key_s, ann):
868            announcements[key_s] = ann
869        ic2.subscribe_to("storage", got)
870        ic2._load_announcements() # normally happens when connection fails
871        yield flushEventualQueue()
872
873        self.failUnless(public_key_str in announcements)
874        self.failUnlessEqual(announcements[public_key_str]["anonymous-storage-FURL"],
875                             furl2)
876        self.failUnlessEqual(announcements[public_key_str2]["anonymous-storage-FURL"],
877                             furl3)
878
879        c2 = yield create_client(basedir.path)
880        c2.introducer_clients[0]._load_announcements()
881        yield flushEventualQueue()
882        self.assertEqual(c2.storage_broker.get_all_serverids(),
883                         frozenset([public_key_str, public_key_str2]))
884
885class ClientSeqnums(AsyncBrokenTestCase):
886
887    @defer.inlineCallbacks
888    def test_client(self):
889        basedir = FilePath("introducer/ClientSeqnums/test_client")
890        private = basedir.child("private")
891        private.makedirs()
892        write_introducer(basedir, "default", "nope")
893        # if storage is enabled, the Client will publish its storage server
894        # during startup (although the announcement will wait in a queue
895        # until the introducer connection is established). To avoid getting
896        # confused by this, disable storage.
897        with basedir.child("tahoe.cfg").open("w") as f:
898            f.write(b"[storage]\n")
899            f.write(b"enabled = false\n")
900
901        c = yield create_client(basedir.path)
902        ic = c.introducer_clients[0]
903        outbound = ic._outbound_announcements
904        published = ic._published_announcements
905        def read_seqnum():
906            seqnum = basedir.child("announcement-seqnum").getContent()
907            return int(seqnum)
908
909        ic.publish("sA", {"key": "value1"}, c._node_private_key)
910        self.failUnlessEqual(read_seqnum(), 1)
911        self.failUnless("sA" in outbound)
912        self.failUnlessEqual(outbound["sA"]["seqnum"], 1)
913        nonce1 = outbound["sA"]["nonce"]
914        self.failUnless(isinstance(nonce1, bytes))
915        # Make nonce unicode, to match JSON:
916        outbound["sA"]["nonce"] = str(nonce1, "utf-8")
917        self.failUnlessEqual(json.loads(published["sA"][0]),
918                             outbound["sA"])
919        # [1] is the signature, [2] is the pubkey
920
921        # publishing a second service causes both services to be
922        # re-published, with the next higher sequence number
923        ic.publish("sB", {"key": "value2"}, c._node_private_key)
924        self.failUnlessEqual(read_seqnum(), 2)
925        self.failUnless("sB" in outbound)
926        self.failUnlessEqual(outbound["sB"]["seqnum"], 2)
927        self.failUnless("sA" in outbound)
928        self.failUnlessEqual(outbound["sA"]["seqnum"], 2)
929        nonce2 = outbound["sA"]["nonce"]
930        self.failUnless(isinstance(nonce2, bytes))
931        self.failIfEqual(nonce1, nonce2)
932        # Make nonce unicode, to match JSON:
933        outbound["sA"]["nonce"] = str(nonce2, "utf-8")
934        outbound["sB"]["nonce"] = str(outbound["sB"]["nonce"], "utf-8")
935        self.failUnlessEqual(json.loads(published["sA"][0]),
936                             outbound["sA"])
937        self.failUnlessEqual(json.loads(published["sB"][0]),
938                             outbound["sB"])
939
940
941
942class TooNewServer(IntroducerService):
943    VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
944                 { },
945                "application-version": "greetings from the crazy future",
946                }
947
948class NonV1Server(SystemTestMixin, AsyncTestCase):
949    # if the client connects to a server that doesn't provide the 'v2'
950    # protocol, it is supposed to provide a useful error instead of a weird
951    # exception.
952
953    def test_failure(self):
954        self.basedir = "introducer/NonV1Server/failure"
955        os.makedirs(self.basedir)
956        self.create_tub()
957        i = TooNewServer()
958        i.setServiceParent(self.parent)
959        self.introducer_furl = self.central_tub.registerReference(i)
960
961        tub = Tub()
962        tub.setOption("expose-remote-exception-types", False)
963        tub.setServiceParent(self.parent)
964        listenOnUnused(tub)
965        c = IntroducerClient(tub, self.introducer_furl,
966                             u"nickname-client", "version", "oldest",
967                             fakeseq, FilePath(self.mktemp()))
968        announcements = {}
969        def got(key_s, ann):
970            announcements[key_s] = ann
971        c.subscribe_to("storage", got)
972
973        c.setServiceParent(self.parent)
974
975        # now we wait for it to connect and notice the bad version
976
977        def _got_bad():
978            return bool(c._introducer_error) or bool(c._publisher)
979        d = self.poll(_got_bad)
980        def _done(res):
981            self.failUnless(c._introducer_error)
982            self.failUnless(c._introducer_error.check(InsufficientVersionError),
983                            c._introducer_error)
984        d.addCallback(_done)
985        return d
986
987class DecodeFurl(SyncTestCase):
988    def test_decode(self):
989        # make sure we have a working base64.b32decode. The one in
990        # python2.4.[01] was broken.
991        furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
992        m = re.match(r'pb://(\w+)@', furl)
993        assert m
994        nodeid = b32decode(m.group(1).upper().encode("ascii"))
995        self.failUnlessEqual(nodeid, b"\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
996
997class Signatures(SyncTestCase):
998
999    def test_sign(self):
1000        ann = {"key1": "value1"}
1001        private_key, public_key = ed25519.create_signing_keypair()
1002        public_key_str = ed25519.string_from_verifying_key(public_key)
1003        ann_t = sign_to_foolscap(ann, private_key)
1004        (msg, sig, key) = ann_t
1005        self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
1006        self.failUnlessEqual(json.loads(msg.decode("utf-8")), ann)
1007        self.failUnless(sig.startswith(b"v0-"))
1008        self.failUnless(key.startswith(b"v0-"))
1009        (ann2,key2) = unsign_from_foolscap(ann_t)
1010        self.failUnlessEqual(ann2, ann)
1011        self.failUnlessEqual(b"pub-" + key2, public_key_str)
1012
1013        # not signed
1014        self.failUnlessRaises(UnknownKeyError,
1015                              unsign_from_foolscap, (msg, None, key))
1016        self.failUnlessRaises(UnknownKeyError,
1017                              unsign_from_foolscap, (msg, sig, None))
1018        # bad signature
1019        bad_ann = {"key1": "value2"}
1020        bad_msg = json.dumps(bad_ann).encode("utf-8")
1021        self.failUnlessRaises(BadSignature,
1022                              unsign_from_foolscap, (bad_msg, sig, key))
1023
1024        # unrecognized signatures
1025        self.failUnlessRaises(UnknownKeyError,
1026                              unsign_from_foolscap, (bad_msg, b"v999-sig", key))
1027        self.failUnlessRaises(UnknownKeyError,
1028                              unsign_from_foolscap, (bad_msg, sig, b"v999-key"))
1029
1030    def test_unsigned_announcement(self):
1031        """
1032        An incorrectly signed announcement is not delivered to subscribers.
1033        """
1034        private_key, public_key = ed25519.create_signing_keypair()
1035        public_key_str = ed25519.string_from_verifying_key(public_key)
1036
1037        ic = IntroducerClient(
1038            Tub(),
1039            "pb://",
1040            u"fake_nick",
1041            "0.0.0",
1042            "1.2.3",
1043            (0, u"i am a nonce"),
1044            FilePath(self.mktemp()),
1045        )
1046        received = {}
1047        ic.subscribe_to("good-stuff", partial(setitem, received))
1048
1049        # Deliver a good message to prove our test code is valid.
1050        ann = {"service-name": "good-stuff", "payload": "hello"}
1051        ann_t = sign_to_foolscap(ann, private_key)
1052        ic.got_announcements([ann_t])
1053
1054        self.assertEqual(
1055            {public_key_str[len("pub-"):]: ann},
1056            received,
1057        )
1058        received.clear()
1059
1060        # Now deliver one without a valid signature and observe that it isn't
1061        # delivered to the subscriber.
1062        ann = {"service-name": "good-stuff", "payload": "bad stuff"}
1063        (msg, sig, key) = sign_to_foolscap(ann, private_key)
1064        # Drop a base32 word from the middle of the key to invalidate the
1065        # signature.
1066        sig_a = bytearray(sig)
1067        sig_a[20:22] = []
1068        sig = bytes(sig_a)
1069        ann_t = (msg, sig, key)
1070        ic.got_announcements([ann_t])
1071
1072        # The received announcements dict should remain empty because we
1073        # should not receive the announcement with the invalid signature.
1074        self.assertEqual(
1075            {},
1076            received,
1077        )
1078
1079
1080# add tests of StorageFarmBroker: if it receives duplicate announcements, it
1081# should leave the Reconnector in place, also if it receives
1082# same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
1083# should tear down the Reconnector and make a new one. This behavior used to
1084# live in the IntroducerClient, and thus used to be tested by test_introducer
1085
1086# copying more tests from old branch:
1087
1088#  then also add Upgrade test
Note: See TracBrowser for help on using the repository browser.