source file: /home/buildslave/tahoe/edgy/build/src/allmydata/introducer/server.py
file stats: 81 lines, 79 executed: 97.5% covered
1.
2. import time, os.path
3. from zope.interface import implements
4. from twisted.application import service
5. from foolscap import Referenceable
6. from allmydata import node
7. from allmydata.util import log
8. from allmydata.introducer.interfaces import \
9. RIIntroducerPublisherAndSubscriberService
10. from allmydata.introducer.common import make_index
11.
12. class IntroducerNode(node.Node):
13. PORTNUMFILE = "introducer.port"
14. NODETYPE = "introducer"
15.
16. def __init__(self, basedir="."):
17. node.Node.__init__(self, basedir)
18. self.read_config()
19. self.init_introducer()
20. webport = self.get_config("node", "web.port", None)
21. if webport:
22. self.init_web(webport) # strports string
23.
24. def init_introducer(self):
25. introducerservice = IntroducerService(self.basedir)
26. self.add_service(introducerservice)
27.
28. d = self.when_tub_ready()
29. def _publish(res):
30. self.introducer_url = self.tub.registerReference(introducerservice,
31. "introducer")
32. self.log(" introducer is at %s" % self.introducer_url)
33. self.write_config("introducer.furl", self.introducer_url + "\n")
34. d.addCallback(_publish)
35. d.addErrback(log.err, facility="tahoe.init",
36. level=log.BAD, umid="UaNs9A")
37.
38. def init_web(self, webport):
39. self.log("init_web(webport=%s)", args=(webport,))
40.
41. from allmydata.webish import IntroducerWebishServer
42. nodeurl_path = os.path.join(self.basedir, "node.url")
43. ws = IntroducerWebishServer(webport, nodeurl_path)
44. self.add_service(ws)
45.
46. class IntroducerService(service.MultiService, Referenceable):
47. implements(RIIntroducerPublisherAndSubscriberService)
48. name = "introducer"
49.
50. def __init__(self, basedir="."):
51. service.MultiService.__init__(self)
52. self.introducer_url = None
53. # 'index' is (tubid, service_name)
54. self._announcements = {} # dict of index -> (announcement, timestamp)
55. self._subscribers = {} # dict of (rref->timestamp) dicts
56.
57. def log(self, *args, **kwargs):
58. if "facility" not in kwargs:
59. kwargs["facility"] = "tahoe.introducer"
60. return log.msg(*args, **kwargs)
61.
62. def get_announcements(self):
63. return self._announcements
64. def get_subscribers(self):
65. return self._subscribers
66.
67. def remote_publish(self, announcement):
68. self.log("introducer: announcement published: %s" % (announcement,) )
69. index = make_index(announcement)
70. if index in self._announcements:
71. (old_announcement, timestamp) = self._announcements[index]
72. if old_announcement == announcement:
73. self.log("but we already knew it, ignoring", level=log.NOISY)
74. return
75. else:
76. self.log("old announcement being updated", level=log.NOISY)
77. self._announcements[index] = (announcement, time.time())
78. (furl, service_name, ri_name, nickname, ver, oldest) = announcement
79. for s in self._subscribers.get(service_name, []):
80. s.callRemote("announce", set([announcement]))
81.
82. def remote_subscribe(self, subscriber, service_name):
83. self.log("introducer: subscription[%s] request at %s" % (service_name,
84. subscriber))
85. if service_name not in self._subscribers:
86. self._subscribers[service_name] = {}
87. subscribers = self._subscribers[service_name]
88. if subscriber in subscribers:
89. self.log("but they're already subscribed, ignoring",
90. level=log.UNUSUAL)
91. return
92. subscribers[subscriber] = time.time()
93. def _remove():
94. self.log("introducer: unsubscribing[%s] %s" % (service_name,
95. subscriber))
96. subscribers.pop(subscriber, None)
97. subscriber.notifyOnDisconnect(_remove)
98.
99. announcements = set( [ ann
100. for idx,(ann,when) in self._announcements.items()
101. if idx[1] == service_name] )
102. d = subscriber.callRemote("announce", announcements)
103. d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
104.
105.
106.