source file: /home/buildslave/tahoe/edgy/build/src/allmydata/introducer/server.py
file stats: 81 lines, 79 executed: 97.5% covered
   1. 
   2. import time, os.path
   3. from zope.interface import implements
   4. from twisted.application import service
   5. from foolscap import Referenceable
   6. from allmydata import node
   7. from allmydata.util import log
   8. from allmydata.introducer.interfaces import \
   9.      RIIntroducerPublisherAndSubscriberService
  10. from allmydata.introducer.common import make_index
  11. 
  12. class IntroducerNode(node.Node):
  13.     PORTNUMFILE = "introducer.port"
  14.     NODETYPE = "introducer"
  15. 
  16.     def __init__(self, basedir="."):
  17.         node.Node.__init__(self, basedir)
  18.         self.read_config()
  19.         self.init_introducer()
  20.         webport = self.get_config("node", "web.port", None)
  21.         if webport:
  22.             self.init_web(webport) # strports string
  23. 
  24.     def init_introducer(self):
  25.         introducerservice = IntroducerService(self.basedir)
  26.         self.add_service(introducerservice)
  27. 
  28.         d = self.when_tub_ready()
  29.         def _publish(res):
  30.             self.introducer_url = self.tub.registerReference(introducerservice,
  31.                                                              "introducer")
  32.             self.log(" introducer is at %s" % self.introducer_url)
  33.             self.write_config("introducer.furl", self.introducer_url + "\n")
  34.         d.addCallback(_publish)
  35.         d.addErrback(log.err, facility="tahoe.init",
  36.                      level=log.BAD, umid="UaNs9A")
  37. 
  38.     def init_web(self, webport):
  39.         self.log("init_web(webport=%s)", args=(webport,))
  40. 
  41.         from allmydata.webish import IntroducerWebishServer
  42.         nodeurl_path = os.path.join(self.basedir, "node.url")
  43.         ws = IntroducerWebishServer(webport, nodeurl_path)
  44.         self.add_service(ws)
  45. 
  46. class IntroducerService(service.MultiService, Referenceable):
  47.     implements(RIIntroducerPublisherAndSubscriberService)
  48.     name = "introducer"
  49. 
  50.     def __init__(self, basedir="."):
  51.         service.MultiService.__init__(self)
  52.         self.introducer_url = None
  53.         # 'index' is (tubid, service_name)
  54.         self._announcements = {} # dict of index -> (announcement, timestamp)
  55.         self._subscribers = {} # dict of (rref->timestamp) dicts
  56. 
  57.     def log(self, *args, **kwargs):
  58.         if "facility" not in kwargs:
  59.             kwargs["facility"] = "tahoe.introducer"
  60.         return log.msg(*args, **kwargs)
  61. 
  62.     def get_announcements(self):
  63.         return self._announcements
  64.     def get_subscribers(self):
  65.         return self._subscribers
  66. 
  67.     def remote_publish(self, announcement):
  68.         self.log("introducer: announcement published: %s" % (announcement,) )
  69.         index = make_index(announcement)
  70.         if index in self._announcements:
  71.             (old_announcement, timestamp) = self._announcements[index]
  72.             if old_announcement == announcement:
  73.                 self.log("but we already knew it, ignoring", level=log.NOISY)
  74.                 return
  75.             else:
  76.                 self.log("old announcement being updated", level=log.NOISY)
  77.         self._announcements[index] = (announcement, time.time())
  78.         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
  79.         for s in self._subscribers.get(service_name, []):
  80.             s.callRemote("announce", set([announcement]))
  81. 
  82.     def remote_subscribe(self, subscriber, service_name):
  83.         self.log("introducer: subscription[%s] request at %s" % (service_name,
  84.                                                                  subscriber))
  85.         if service_name not in self._subscribers:
  86.             self._subscribers[service_name] = {}
  87.         subscribers = self._subscribers[service_name]
  88.         if subscriber in subscribers:
  89.             self.log("but they're already subscribed, ignoring",
  90.                      level=log.UNUSUAL)
  91.             return
  92.         subscribers[subscriber] = time.time()
  93.         def _remove():
  94.             self.log("introducer: unsubscribing[%s] %s" % (service_name,
  95.                                                            subscriber))
  96.             subscribers.pop(subscriber, None)
  97.         subscriber.notifyOnDisconnect(_remove)
  98. 
  99.         announcements = set( [ ann
 100.                                for idx,(ann,when) in self._announcements.items()
 101.                                if idx[1] == service_name] )
 102.         d = subscriber.callRemote("announce", announcements)
 103.         d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
 104. 
 105. 
 106.