source file: /home/buildslave/tahoe/edgy/build/src/allmydata/introducer/client.py
file stats: 179 lines, 168 executed: 93.9% covered
   1. 
   2. import re, time, sha
   3. from base64 import b32decode
   4. from zope.interface import implements
   5. from twisted.application import service
   6. from foolscap import Referenceable
   7. from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
   8.      IIntroducerClient
   9. from allmydata.util import log, idlib
  10. from allmydata.introducer.common import make_index
  11. 
  12. 
  13. class RemoteServiceConnector:
  14.     """I hold information about a peer service that we want to connect to. If
  15.     we are connected, I hold the RemoteReference, the peer's address, and the
  16.     peer's version information. I remember information about when we were
  17.     last connected to the peer too, even if we aren't currently connected.
  18. 
  19.     @ivar announcement_time: when we first heard about this service
  20.     @ivar last_connect_time: when we last established a connection
  21.     @ivar last_loss_time: when we last lost a connection
  22. 
  23.     @ivar version: the peer's version, from the most recent announcement
  24.     @ivar oldest_supported: the peer's oldest supported version, same
  25.     @ivar nickname: the peer's self-reported nickname, same
  26. 
  27.     @ivar rref: the RemoteReference, if connected, otherwise None
  28.     @ivar remote_host: the IAddress, if connected, otherwise None
  29.     """
  30. 
  31.     def __init__(self, announcement, tub, ic):
  32.         self._tub = tub
  33.         self._announcement = announcement
  34.         self._ic = ic
  35.         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
  36. 
  37.         self._furl = furl
  38.         m = re.match(r'pb://(\w+)@', furl)
  39.         assert m
  40.         self._nodeid = b32decode(m.group(1).upper())
  41.         self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
  42. 
  43.         self.service_name = service_name
  44. 
  45.         self.log("attempting to connect to %s" % self._nodeid_s)
  46.         self.announcement_time = time.time()
  47.         self.last_loss_time = None
  48.         self.rref = None
  49.         self.remote_host = None
  50.         self.last_connect_time = None
  51.         self.version = ver
  52.         self.oldest_supported = oldest
  53.         self.nickname = nickname
  54. 
  55.     def log(self, *args, **kwargs):
  56.         return self._ic.log(*args, **kwargs)
  57. 
  58.     def startConnecting(self):
  59.         self._reconnector = self._tub.connectTo(self._furl, self._got_service)
  60. 
  61.     def stopConnecting(self):
  62.         self._reconnector.stopConnecting()
  63. 
  64.     def _got_service(self, rref):
  65.         self.last_connect_time = time.time()
  66.         self.remote_host = rref.tracker.broker.transport.getPeer()
  67. 
  68.         self.rref = rref
  69.         self.log("connected to %s" % self._nodeid_s)
  70. 
  71.         self._ic.add_connection(self._nodeid, self.service_name, rref)
  72. 
  73.         rref.notifyOnDisconnect(self._lost, rref)
  74. 
  75.     def _lost(self, rref):
  76.         self.log("lost connection to %s" % self._nodeid_s)
  77.         self.last_loss_time = time.time()
  78.         self.rref = None
  79.         self.remote_host = None
  80.         self._ic.remove_connection(self._nodeid, self.service_name, rref)
  81. 
  82.     def reset(self):
  83.         self._reconnector.reset()
  84. 
  85. 
  86. class IntroducerClient(service.Service, Referenceable):
  87.     implements(RIIntroducerSubscriberClient, IIntroducerClient)
  88. 
  89.     def __init__(self, tub, introducer_furl,
  90.                  nickname, my_version, oldest_supported):
  91.         self._tub = tub
  92.         self.introducer_furl = introducer_furl
  93. 
  94.         self._nickname = nickname
  95.         self._my_version = my_version
  96.         self._oldest_supported = oldest_supported
  97. 
  98.         self._published_announcements = set()
  99. 
 100.         self._publisher = None
 101.         self._connected = False
 102. 
 103.         self._subscribed_service_names = set()
 104.         self._subscriptions = set() # requests we've actually sent
 105.         self._received_announcements = set()
 106.         # TODO: this set will grow without bound, until the node is restarted
 107. 
 108.         # we only accept one announcement per (peerid+service_name) pair.
 109.         # This insures that an upgraded host replace their previous
 110.         # announcement. It also means that each peer must have their own Tub
 111.         # (no sharing), which is slightly weird but consistent with the rest
 112.         # of the Tahoe codebase.
 113.         self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
 114.         # self._connections is a set of (peerid, service_name, rref) tuples
 115.         self._connections = set()
 116. 
 117.         self.counter = 0 # incremented each time we change state, for tests
 118.         self.encoding_parameters = None
 119. 
 120.     def startService(self):
 121.         service.Service.startService(self)
 122.         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
 123.         self._introducer_reconnector = rc
 124.         def connect_failed(failure):
 125.             self.log("Initial Introducer connection failed: perhaps it's down",
 126.                      level=log.WEIRD, failure=failure, umid="c5MqUQ")
 127.         d = self._tub.getReference(self.introducer_furl)
 128.         d.addErrback(connect_failed)
 129. 
 130.     def _got_introducer(self, publisher):
 131.         self.log("connected to introducer")
 132.         self._connected = True
 133.         self._publisher = publisher
 134.         publisher.notifyOnDisconnect(self._disconnected)
 135.         self._maybe_publish()
 136.         self._maybe_subscribe()
 137. 
 138.     def _disconnected(self):
 139.         self.log("bummer, we've lost our connection to the introducer")
 140.         self._connected = False
 141.         self._publisher = None
 142.         self._subscriptions.clear()
 143. 
 144.     def stopService(self):
 145.         service.Service.stopService(self)
 146.         self._introducer_reconnector.stopConnecting()
 147.         for rsc in self._connectors.itervalues():
 148.             rsc.stopConnecting()
 149. 
 150.     def log(self, *args, **kwargs):
 151.         if "facility" not in kwargs:
 152.             kwargs["facility"] = "tahoe.introducer"
 153.         return log.msg(*args, **kwargs)
 154. 
 155. 
 156.     def publish(self, furl, service_name, remoteinterface_name):
 157.         ann = (furl, service_name, remoteinterface_name,
 158.                self._nickname, self._my_version, self._oldest_supported)
 159.         self._published_announcements.add(ann)
 160.         self._maybe_publish()
 161. 
 162.     def subscribe_to(self, service_name):
 163.         self._subscribed_service_names.add(service_name)
 164.         self._maybe_subscribe()
 165. 
 166.     def _maybe_subscribe(self):
 167.         if not self._publisher:
 168.             self.log("want to subscribe, but no introducer yet",
 169.                      level=log.NOISY)
 170.             return
 171.         for service_name in self._subscribed_service_names:
 172.             if service_name not in self._subscriptions:
 173.                 # there is a race here, but the subscription desk ignores
 174.                 # duplicate requests.
 175.                 self._subscriptions.add(service_name)
 176.                 d = self._publisher.callRemote("subscribe", self, service_name)
 177.                 d.addErrback(log.err, facility="tahoe.introducer",
 178.                              level=log.WEIRD, umid="2uMScQ")
 179. 
 180.     def _maybe_publish(self):
 181.         if not self._publisher:
 182.             self.log("want to publish, but no introducer yet", level=log.NOISY)
 183.             return
 184.         # this re-publishes everything. The Introducer ignores duplicates
 185.         for ann in self._published_announcements:
 186.             d = self._publisher.callRemote("publish", ann)
 187.             d.addErrback(log.err, facility="tahoe.introducer",
 188.                          level=log.WEIRD, umid="xs9pVQ")
 189. 
 190. 
 191. 
 192.     def remote_announce(self, announcements):
 193.         for ann in announcements:
 194.             self.log("received %d announcements" % len(announcements))
 195.             (furl, service_name, ri_name, nickname, ver, oldest) = ann
 196.             if service_name not in self._subscribed_service_names:
 197.                 self.log("announcement for a service we don't care about [%s]"
 198.                          % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
 199.                 continue
 200.             if ann in self._received_announcements:
 201.                 self.log("ignoring old announcement: %s" % (ann,),
 202.                          level=log.NOISY)
 203.                 continue
 204.             self.log("new announcement[%s]: %s" % (service_name, ann))
 205.             self._received_announcements.add(ann)
 206.             self._new_announcement(ann)
 207. 
 208.     def _new_announcement(self, announcement):
 209.         # this will only be called for new announcements
 210.         index = make_index(announcement)
 211.         if index in self._connectors:
 212.             self.log("replacing earlier announcement", level=log.NOISY)
 213.             self._connectors[index].stopConnecting()
 214.         rsc = RemoteServiceConnector(announcement, self._tub, self)
 215.         self._connectors[index] = rsc
 216.         rsc.startConnecting()
 217. 
 218.     def add_connection(self, nodeid, service_name, rref):
 219.         self._connections.add( (nodeid, service_name, rref) )
 220.         self.counter += 1
 221.         # when one connection is established, reset the timers on all others,
 222.         # to trigger a reconnection attempt in one second. This is intended
 223.         # to accelerate server connections when we've been offline for a
 224.         # while. The goal is to avoid hanging out for a long time with
 225.         # connections to only a subset of the servers, which would increase
 226.         # the chances that we'll put shares in weird places (and not update
 227.         # existing shares of mutable files). See #374 for more details.
 228.         for rsc in self._connectors.values():
 229.             rsc.reset()
 230. 
 231.     def remove_connection(self, nodeid, service_name, rref):
 232.         self._connections.discard( (nodeid, service_name, rref) )
 233.         self.counter += 1
 234. 
 235. 
 236.     def get_all_connections(self):
 237.         return frozenset(self._connections)
 238. 
 239.     def get_all_connectors(self):
 240.         return self._connectors.copy()
 241. 
 242.     def get_all_peerids(self):
 243.         return frozenset([peerid
 244.                           for (peerid, service_name, rref)
 245.                           in self._connections])
 246. 
 247.     def get_all_connections_for(self, service_name):
 248.         return frozenset([c
 249.                           for c in self._connections
 250.                           if c[1] == service_name])
 251. 
 252.     def get_permuted_peers(self, service_name, key):
 253.         """Return an ordered list of (peerid, rref) tuples."""
 254. 
 255.         results = []
 256.         for (c_peerid, c_service_name, rref) in self._connections:
 257.             assert isinstance(c_peerid, str)
 258.             if c_service_name != service_name:
 259.                 continue
 260.             permuted = sha.new(key + c_peerid).digest()
 261.             results.append((permuted, c_peerid, rref))
 262. 
 263.         results.sort(lambda a,b: cmp(a[0], b[0]))
 264.         return [ (r[1], r[2]) for r in results ]
 265. 
 266. 
 267. 
 268.     def remote_set_encoding_parameters(self, parameters):
 269.         self.encoding_parameters = parameters
 270. 
 271.     def connected_to_introducer(self):
 272.         return self._connected
 273. 
 274.     def debug_disconnect_from_peerid(self, victim_nodeid):
 275.         # for unit tests: locate and sever all connections to the given
 276.         # peerid.
 277.         for (nodeid, service_name, rref) in self._connections:
 278.             if nodeid == victim_nodeid:
 279.                 rref.tracker.broker.transport.loseConnection()