source file: /home/buildslave/tahoe/edgy/build/src/allmydata/introducer/old.py
file stats: 230 lines, 200 executed: 87.0% covered
   1. 
   2. # We keep a copy of the old introducer (both client and server) here to
   3. # support compatibility tests. The old client is supposed to handle the new
   4. # server, and new client is supposed to handle the old server.
   5. 
   6. import re, time, sha
   7. from base64 import b32decode
   8. from zope.interface import implements
   9. from twisted.application import service
  10. from foolscap import Referenceable
  11. from allmydata.util import log, idlib
  12. from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
  13.      IIntroducerClient, RIIntroducerPublisherAndSubscriberService
  14. from allmydata.introducer.common import make_index
  15. 
  16. class RemoteServiceConnector:
  17.     """I hold information about a peer service that we want to connect to. If
  18.     we are connected, I hold the RemoteReference, the peer's address, and the
  19.     peer's version information. I remember information about when we were
  20.     last connected to the peer too, even if we aren't currently connected.
  21. 
  22.     @ivar announcement_time: when we first heard about this service
  23.     @ivar last_connect_time: when we last established a connection
  24.     @ivar last_loss_time: when we last lost a connection
  25. 
  26.     @ivar version: the peer's version, from the most recent announcement
  27.     @ivar oldest_supported: the peer's oldest supported version, same
  28.     @ivar nickname: the peer's self-reported nickname, same
  29. 
  30.     @ivar rref: the RemoteReference, if connected, otherwise None
  31.     @ivar remote_host: the IAddress, if connected, otherwise None
  32.     """
  33. 
  34.     def __init__(self, announcement, tub, ic):
  35.         self._tub = tub
  36.         self._announcement = announcement
  37.         self._ic = ic
  38.         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
  39. 
  40.         self._furl = furl
  41.         m = re.match(r'pb://(\w+)@', furl)
  42.         assert m
  43.         self._nodeid = b32decode(m.group(1).upper())
  44.         self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
  45. 
  46.         self.service_name = service_name
  47. 
  48.         self.log("attempting to connect to %s" % self._nodeid_s)
  49.         self.announcement_time = time.time()
  50.         self.last_loss_time = None
  51.         self.rref = None
  52.         self.remote_host = None
  53.         self.last_connect_time = None
  54.         self.version = ver
  55.         self.oldest_supported = oldest
  56.         self.nickname = nickname
  57. 
  58.     def log(self, *args, **kwargs):
  59.         return self._ic.log(*args, **kwargs)
  60. 
  61.     def startConnecting(self):
  62.         self._reconnector = self._tub.connectTo(self._furl, self._got_service)
  63. 
  64.     def stopConnecting(self):
  65.         self._reconnector.stopConnecting()
  66. 
  67.     def _got_service(self, rref):
  68.         self.last_connect_time = time.time()
  69.         self.remote_host = rref.tracker.broker.transport.getPeer()
  70. 
  71.         self.rref = rref
  72.         self.log("connected to %s" % self._nodeid_s)
  73. 
  74.         self._ic.add_connection(self._nodeid, self.service_name, rref)
  75. 
  76.         rref.notifyOnDisconnect(self._lost, rref)
  77. 
  78.     def _lost(self, rref):
  79.         self.log("lost connection to %s" % self._nodeid_s)
  80.         self.last_loss_time = time.time()
  81.         self.rref = None
  82.         self.remote_host = None
  83.         self._ic.remove_connection(self._nodeid, self.service_name, rref)
  84. 
  85.     def reset(self):
  86.         self._reconnector.reset()
  87. 
  88. 
  89. class IntroducerClient_V1(service.Service, Referenceable):
  90.     implements(RIIntroducerSubscriberClient, IIntroducerClient)
  91. 
  92.     def __init__(self, tub, introducer_furl,
  93.                  nickname, my_version, oldest_supported):
  94.         self._tub = tub
  95.         self.introducer_furl = introducer_furl
  96. 
  97.         self._nickname = nickname
  98.         self._my_version = my_version
  99.         self._oldest_supported = oldest_supported
 100. 
 101.         self._published_announcements = set()
 102. 
 103.         self._publisher = None
 104.         self._connected = False
 105. 
 106.         self._subscribed_service_names = set()
 107.         self._subscriptions = set() # requests we've actually sent
 108.         self._received_announcements = set()
 109.         # TODO: this set will grow without bound, until the node is restarted
 110. 
 111.         # we only accept one announcement per (peerid+service_name) pair.
 112.         # This insures that an upgraded host replace their previous
 113.         # announcement. It also means that each peer must have their own Tub
 114.         # (no sharing), which is slightly weird but consistent with the rest
 115.         # of the Tahoe codebase.
 116.         self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
 117.         # self._connections is a set of (peerid, service_name, rref) tuples
 118.         self._connections = set()
 119. 
 120.         self.counter = 0 # incremented each time we change state, for tests
 121.         self.encoding_parameters = None
 122. 
 123.     def startService(self):
 124.         service.Service.startService(self)
 125.         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
 126.         self._introducer_reconnector = rc
 127.         def connect_failed(failure):
 128.             self.log("Initial Introducer connection failed: perhaps it's down",
 129.                      level=log.WEIRD, failure=failure)
 130.         d = self._tub.getReference(self.introducer_furl)
 131.         d.addErrback(connect_failed)
 132. 
 133.     def _got_introducer(self, publisher):
 134.         self.log("connected to introducer")
 135.         self._connected = True
 136.         self._publisher = publisher
 137.         publisher.notifyOnDisconnect(self._disconnected)
 138.         self._maybe_publish()
 139.         self._maybe_subscribe()
 140. 
 141.     def _disconnected(self):
 142.         self.log("bummer, we've lost our connection to the introducer")
 143.         self._connected = False
 144.         self._publisher = None
 145.         self._subscriptions.clear()
 146. 
 147.     def stopService(self):
 148.         service.Service.stopService(self)
 149.         self._introducer_reconnector.stopConnecting()
 150.         for rsc in self._connectors.itervalues():
 151.             rsc.stopConnecting()
 152. 
 153.     def log(self, *args, **kwargs):
 154.         if "facility" not in kwargs:
 155.             kwargs["facility"] = "tahoe.introducer"
 156.         return log.msg(*args, **kwargs)
 157. 
 158. 
 159.     def publish(self, furl, service_name, remoteinterface_name):
 160.         ann = (furl, service_name, remoteinterface_name,
 161.                self._nickname, self._my_version, self._oldest_supported)
 162.         self._published_announcements.add(ann)
 163.         self._maybe_publish()
 164. 
 165.     def subscribe_to(self, service_name):
 166.         self._subscribed_service_names.add(service_name)
 167.         self._maybe_subscribe()
 168. 
 169.     def _maybe_subscribe(self):
 170.         if not self._publisher:
 171.             self.log("want to subscribe, but no introducer yet",
 172.                      level=log.NOISY)
 173.             return
 174.         for service_name in self._subscribed_service_names:
 175.             if service_name not in self._subscriptions:
 176.                 # there is a race here, but the subscription desk ignores
 177.                 # duplicate requests.
 178.                 self._subscriptions.add(service_name)
 179.                 d = self._publisher.callRemote("subscribe", self, service_name)
 180.                 d.addErrback(log.err, facility="tahoe.introducer",
 181.                              level=log.WEIRD)
 182. 
 183.     def _maybe_publish(self):
 184.         if not self._publisher:
 185.             self.log("want to publish, but no introducer yet", level=log.NOISY)
 186.             return
 187.         # this re-publishes everything. The Introducer ignores duplicates
 188.         for ann in self._published_announcements:
 189.             d = self._publisher.callRemote("publish", ann)
 190.             d.addErrback(log.err, facility="tahoe.introducer",
 191.                          level=log.WEIRD)
 192. 
 193. 
 194. 
 195.     def remote_announce(self, announcements):
 196.         for ann in announcements:
 197.             self.log("received %d announcements" % len(announcements))
 198.             (furl, service_name, ri_name, nickname, ver, oldest) = ann
 199.             if service_name not in self._subscribed_service_names:
 200.                 self.log("announcement for a service we don't care about [%s]"
 201.                          % (service_name,), level=log.WEIRD)
 202.                 continue
 203.             if ann in self._received_announcements:
 204.                 self.log("ignoring old announcement: %s" % (ann,),
 205.                          level=log.NOISY)
 206.                 continue
 207.             self.log("new announcement[%s]: %s" % (service_name, ann))
 208.             self._received_announcements.add(ann)
 209.             self._new_announcement(ann)
 210. 
 211.     def _new_announcement(self, announcement):
 212.         # this will only be called for new announcements
 213.         index = make_index(announcement)
 214.         if index in self._connectors:
 215.             self.log("replacing earlier announcement", level=log.NOISY)
 216.             self._connectors[index].stopConnecting()
 217.         rsc = RemoteServiceConnector(announcement, self._tub, self)
 218.         self._connectors[index] = rsc
 219.         rsc.startConnecting()
 220. 
 221.     def add_connection(self, nodeid, service_name, rref):
 222.         self._connections.add( (nodeid, service_name, rref) )
 223.         self.counter += 1
 224.         # when one connection is established, reset the timers on all others,
 225.         # to trigger a reconnection attempt in one second. This is intended
 226.         # to accelerate server connections when we've been offline for a
 227.         # while. The goal is to avoid hanging out for a long time with
 228.         # connections to only a subset of the servers, which would increase
 229.         # the chances that we'll put shares in weird places (and not update
 230.         # existing shares of mutable files). See #374 for more details.
 231.         for rsc in self._connectors.values():
 232.             rsc.reset()
 233. 
 234.     def remove_connection(self, nodeid, service_name, rref):
 235.         self._connections.discard( (nodeid, service_name, rref) )
 236.         self.counter += 1
 237. 
 238. 
 239.     def get_all_connections(self):
 240.         return frozenset(self._connections)
 241. 
 242.     def get_all_connectors(self):
 243.         return self._connectors.copy()
 244. 
 245.     def get_all_peerids(self):
 246.         return frozenset([peerid
 247.                           for (peerid, service_name, rref)
 248.                           in self._connections])
 249. 
 250.     def get_nickname_for_peerid(self, peerid):
 251.         for k in self._connectors:
 252.             (peerid0, svcname0) = k
 253.             if peerid0 == peerid:
 254.                 rsc = self._connectors[k]
 255.                 return rsc.nickname
 256.         return None
 257. 
 258.     def get_all_connections_for(self, service_name):
 259.         return frozenset([c
 260.                           for c in self._connections
 261.                           if c[1] == service_name])
 262. 
 263.     def get_permuted_peers(self, service_name, key):
 264.         """Return an ordered list of (peerid, rref) tuples."""
 265. 
 266.         results = []
 267.         for (c_peerid, c_service_name, rref) in self._connections:
 268.             assert isinstance(c_peerid, str)
 269.             if c_service_name != service_name:
 270.                 continue
 271.             permuted = sha.new(key + c_peerid).digest()
 272.             results.append((permuted, c_peerid, rref))
 273. 
 274.         results.sort(lambda a,b: cmp(a[0], b[0]))
 275.         return [ (r[1], r[2]) for r in results ]
 276. 
 277. 
 278. 
 279.     def remote_set_encoding_parameters(self, parameters):
 280.         self.encoding_parameters = parameters
 281. 
 282.     def connected_to_introducer(self):
 283.         return self._connected
 284. 
 285.     def debug_disconnect_from_peerid(self, victim_nodeid):
 286.         # for unit tests: locate and sever all connections to the given
 287.         # peerid.
 288.         for (nodeid, service_name, rref) in self._connections:
 289.             if nodeid == victim_nodeid:
 290.                 rref.tracker.broker.transport.loseConnection()
 291. 
 292. 
 293. class IntroducerService_V1(service.MultiService, Referenceable):
 294.     implements(RIIntroducerPublisherAndSubscriberService)
 295.     name = "introducer"
 296. 
 297.     def __init__(self, basedir="."):
 298.         service.MultiService.__init__(self)
 299.         self.introducer_url = None
 300.         # 'index' is (tubid, service_name)
 301.         self._announcements = {} # dict of index -> (announcement, timestamp)
 302.         self._subscribers = {} # dict of (rref->timestamp) dicts
 303. 
 304.     def log(self, *args, **kwargs):
 305.         if "facility" not in kwargs:
 306.             kwargs["facility"] = "tahoe.introducer"
 307.         return log.msg(*args, **kwargs)
 308. 
 309.     def get_announcements(self):
 310.         return self._announcements
 311.     def get_subscribers(self):
 312.         return self._subscribers
 313. 
 314.     def remote_publish(self, announcement):
 315.         self.log("introducer: announcement published: %s" % (announcement,) )
 316.         index = make_index(announcement)
 317.         if index in self._announcements:
 318.             (old_announcement, timestamp) = self._announcements[index]
 319.             if old_announcement == announcement:
 320.                 self.log("but we already knew it, ignoring", level=log.NOISY)
 321.                 return
 322.             else:
 323.                 self.log("old announcement being updated", level=log.NOISY)
 324.         self._announcements[index] = (announcement, time.time())
 325.         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
 326.         for s in self._subscribers.get(service_name, []):
 327.             s.callRemote("announce", set([announcement]))
 328. 
 329.     def remote_subscribe(self, subscriber, service_name):
 330.         self.log("introducer: subscription[%s] request at %s" % (service_name,
 331.                                                                  subscriber))
 332.         if service_name not in self._subscribers:
 333.             self._subscribers[service_name] = {}
 334.         subscribers = self._subscribers[service_name]
 335.         if subscriber in subscribers:
 336.             self.log("but they're already subscribed, ignoring",
 337.                      level=log.UNUSUAL)
 338.             return
 339.         subscribers[subscriber] = time.time()
 340.         def _remove():
 341.             self.log("introducer: unsubscribing[%s] %s" % (service_name,
 342.                                                            subscriber))
 343.             subscribers.pop(subscriber, None)
 344.         subscriber.notifyOnDisconnect(_remove)
 345. 
 346.         announcements = set( [ ann
 347.                                for idx,(ann,when) in self._announcements.items()
 348.                                if idx[1] == service_name] )
 349.         d = subscriber.callRemote("announce", announcements)
 350.         d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)