Ticket #778: behavior3.txt

File behavior3.txt, 97.7 KB (added by kevan, at 2010-05-14T01:48:09Z)
Line 
1Wed Sep 23 21:19:32 PDT 2009  Kevan Carstensen <kevan@isnotajoke.com>
2  * Alter CiphertextDownloader to work with servers_of_happiness
3
4Tue Nov  3 19:32:41 PST 2009  Kevan Carstensen <kevan@isnotajoke.com>
5  * Alter the signature of set_shareholders in IEncoder to add a 'servermap' parameter, which gives IEncoders enough information to perform a sane check for servers_of_happiness.
6
7Wed Nov  4 03:12:22 PST 2009  Kevan Carstensen <kevan@isnotajoke.com>
8  * Alter 'immutable/encode.py' and 'immutable/upload.py' to use servers_of_happiness instead of shares_of_happiness.
9
10Mon Nov 16 11:28:05 PST 2009  Kevan Carstensen <kevan@isnotajoke.com>
11  * Alter Tahoe2PeerSelector to make sure that it recognizes existing shares on readonly servers, fixing an issue in #778
12
13Mon Nov 16 13:24:59 PST 2009  Kevan Carstensen <kevan@isnotajoke.com>
14  * Change stray "shares_of_happiness" to "servers_of_happiness"
15
16Tue Nov 17 17:45:42 PST 2009  Kevan Carstensen <kevan@isnotajoke.com>
17  * Eliminate overcounting iof servers_of_happiness in Tahoe2PeerSelector; also reorganize some things.
18
19Sun Nov 22 16:24:05 PST 2009  Kevan Carstensen <kevan@isnotajoke.com>
20  * Alter the error message returned when peer selection fails
21 
22  The Tahoe2PeerSelector returned either NoSharesError or NotEnoughSharesError
23  for a variety of error conditions that weren't informatively described by them.
24  This patch creates a new error, UploadHappinessError, replaces uses of
25  NoSharesError and NotEnoughSharesError with it, and alters the error message
26  raised with the errors to be more in line with the new servers_of_happiness
27  behavior. See ticket #834 for more information.
28
29Fri Dec  4 20:30:37 PST 2009  Kevan Carstensen <kevan@isnotajoke.com>
30  * Change "UploadHappinessError" to "UploadUnhappinessError"
31
32Wed Dec 30 13:03:44 PST 2009  Kevan Carstensen <kevan@isnotajoke.com>
33  * Alter the error message when an upload fails, per some comments in #778.
34 
35  When I first implemented #778, I just altered the error messages to refer to
36  servers where they referred to shares. The resulting error messages weren't
37  very good. These are a bit better.
38
39Thu May 13 17:49:17 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
40  * Fix up the behavior of #778, per reviewers' comments
41 
42    - Make some important utility functions clearer and more thoroughly
43      documented.
44    - Assert in upload.servers_of_happiness that the buckets attributes
45      of PeerTrackers passed to it are mutually disjoint.
46    - Get rid of some silly non-Pythonisms that I didn't see when I first
47      wrote these patches.
48    - Make sure that should_add_server returns true when queried about a
49      shnum that it doesn't know about yet.
50    - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set
51      of peerids, alter dependencies to deal with that.
52    - Remove upload.should_add_servers, because it is no longer necessary
53    - Move upload.shares_of_happiness and upload.shares_by_server to a utility
54      file.
55    - Change some points in Tahoe2PeerSelector.
56    - Compute servers_of_happiness using a bipartite matching algorithm that
57      we know is optimal instead of an ad-hoc greedy algorithm that isn't.
58    - Change servers_of_happiness to just take a sharemap as an argument,
59      change its callers to merge existing_shares and used_peers before
60      calling it.
61    - Change an error message in the encoder to be more appropriate for
62      servers of happiness.
63    - Clarify the wording of an error message in immutable/upload.py
64    - Refactor a happiness failure message to happinessutil.py, and make
65      immutable/upload.py and immutable/encode.py use it.
66    - Move the word "only" as far to the right as possible in failure
67      messages.
68    - Use a better definition of progress during peer selection.
69    - Do read-only peer share detection queries in parallel, not sequentially.
70    - Clean up logging semantics; print the query statistics whenever an
71      upload is unsuccessful, not just in one case.
72 
73
74New patches:
75
76[Alter CiphertextDownloader to work with servers_of_happiness
77Kevan Carstensen <kevan@isnotajoke.com>**20090924041932
78 Ignore-this: e81edccf0308c2d3bedbc4cf217da197
79] hunk ./src/allmydata/immutable/download.py 1039
80             # Repairer (uploader) needs the encodingparams.
81             self._target.set_encodingparams((
82                 self._verifycap.needed_shares,
83-                self._verifycap.total_shares, # I don't think the target actually cares about "happy".
84+                0, # see ticket #778 for why this is
85                 self._verifycap.total_shares,
86                 self._vup.segment_size
87                 ))
88[Alter the signature of set_shareholders in IEncoder to add a 'servermap' parameter, which gives IEncoders enough information to perform a sane check for servers_of_happiness.
89Kevan Carstensen <kevan@isnotajoke.com>**20091104033241
90 Ignore-this: b3a6649a8ac66431beca1026a31fed94
91] {
92hunk ./src/allmydata/interfaces.py 1341
93         Once this is called, set_size() and set_params() may not be called.
94         """
95 
96-    def set_shareholders(shareholders):
97+    def set_shareholders(shareholders, servermap):
98         """Tell the encoder where to put the encoded shares. 'shareholders'
99         must be a dictionary that maps share number (an integer ranging from
100hunk ./src/allmydata/interfaces.py 1344
101-        0 to n-1) to an instance that provides IStorageBucketWriter. This
102-        must be performed before start() can be called."""
103+        0 to n-1) to an instance that provides IStorageBucketWriter.
104+        'servermap' is a dictionary that maps share number (as defined above)
105+        to a peerid. This must be performed before start() can be called."""
106 
107     def start():
108         """Begin the encode/upload process. This involves reading encrypted
109}
110[Alter 'immutable/encode.py' and 'immutable/upload.py' to use servers_of_happiness instead of shares_of_happiness.
111Kevan Carstensen <kevan@isnotajoke.com>**20091104111222
112 Ignore-this: abb3283314820a8bbf9b5d0cbfbb57c8
113] {
114hunk ./src/allmydata/immutable/encode.py 121
115         assert not self._codec
116         k, happy, n, segsize = params
117         self.required_shares = k
118-        self.shares_of_happiness = happy
119+        self.servers_of_happiness = happy
120         self.num_shares = n
121         self.segment_size = segsize
122         self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
123hunk ./src/allmydata/immutable/encode.py 179
124         if name == "storage_index":
125             return self._storage_index
126         elif name == "share_counts":
127-            return (self.required_shares, self.shares_of_happiness,
128+            return (self.required_shares, self.servers_of_happiness,
129                     self.num_shares)
130         elif name == "num_segments":
131             return self.num_segments
132hunk ./src/allmydata/immutable/encode.py 194
133         else:
134             raise KeyError("unknown parameter name '%s'" % name)
135 
136-    def set_shareholders(self, landlords):
137+    def set_shareholders(self, landlords, servermap):
138         assert isinstance(landlords, dict)
139         for k in landlords:
140             assert IStorageBucketWriter.providedBy(landlords[k])
141hunk ./src/allmydata/immutable/encode.py 199
142         self.landlords = landlords.copy()
143+        assert isinstance(servermap, dict)
144+        self.servermap = servermap.copy()
145 
146     def start(self):
147         """ Returns a Deferred that will fire with the verify cap (an instance of
148hunk ./src/allmydata/immutable/encode.py 491
149             # even more UNUSUAL
150             self.log("they weren't in our list of landlords", parent=ln,
151                      level=log.WEIRD, umid="TQGFRw")
152-        if len(self.landlords) < self.shares_of_happiness:
153-            msg = "lost too many shareholders during upload (still have %d, want %d): %s" % \
154-                  (len(self.landlords), self.shares_of_happiness, why)
155-            if self.landlords:
156+        del(self.servermap[shareid])
157+        servers_left = list(set(self.servermap.values()))
158+        if len(servers_left) < self.servers_of_happiness:
159+            msg = "lost too many servers during upload (still have %d, want %d): %s" % \
160+                  (len(servers_left),
161+                   self.servers_of_happiness, why)
162+            if servers_left:
163                 raise NotEnoughSharesError(msg)
164             else:
165                 raise NoSharesError(msg)
166hunk ./src/allmydata/immutable/encode.py 502
167         self.log("but we can still continue with %s shares, we'll be happy "
168-                 "with at least %s" % (len(self.landlords),
169-                                       self.shares_of_happiness),
170+                 "with at least %s" % (len(servers_left),
171+                                       self.servers_of_happiness),
172                  parent=ln)
173 
174     def _gather_responses(self, dl):
175hunk ./src/allmydata/immutable/upload.py 131
176         self.buckets.update(b)
177         return (alreadygot, set(b.keys()))
178 
179+def servers_with_shares(existing_shares, used_peers=None):
180+    servers = []
181+    if used_peers:
182+        peers = list(used_peers.copy())
183+        # We do this because the preexisting shares list goes by peerid.
184+        peers = [x.peerid for x in peers]
185+        servers.extend(peers)
186+    servers.extend(existing_shares.values())
187+    return list(set(servers))
188+
189+def shares_by_server(existing_shares):
190+    servers = {}
191+    for server in set(existing_shares.values()):
192+        servers[server] = set([x for x in existing_shares.keys()
193+                               if existing_shares[x] == server])
194+    return servers
195+
196 class Tahoe2PeerSelector:
197 
198     def __init__(self, upload_id, logparent=None, upload_status=None):
199hunk ./src/allmydata/immutable/upload.py 164
200 
201     def get_shareholders(self, storage_broker, secret_holder,
202                          storage_index, share_size, block_size,
203-                         num_segments, total_shares, shares_of_happiness):
204+                         num_segments, total_shares, servers_of_happiness):
205         """
206         @return: (used_peers, already_peers), where used_peers is a set of
207                  PeerTracker instances that have agreed to hold some shares
208hunk ./src/allmydata/immutable/upload.py 177
209             self._status.set_status("Contacting Peers..")
210 
211         self.total_shares = total_shares
212-        self.shares_of_happiness = shares_of_happiness
213+        self.servers_of_happiness = servers_of_happiness
214 
215         self.homeless_shares = range(total_shares)
216         # self.uncontacted_peers = list() # peers we haven't asked yet
217hunk ./src/allmydata/immutable/upload.py 242
218         d = defer.maybeDeferred(self._loop)
219         return d
220 
221+
222     def _loop(self):
223         if not self.homeless_shares:
224hunk ./src/allmydata/immutable/upload.py 245
225-            # all done
226-            msg = ("placed all %d shares, "
227-                   "sent %d queries to %d peers, "
228-                   "%d queries placed some shares, %d placed none, "
229-                   "got %d errors" %
230-                   (self.total_shares,
231-                    self.query_count, self.num_peers_contacted,
232-                    self.good_query_count, self.bad_query_count,
233-                    self.error_count))
234-            log.msg("peer selection successful for %s: %s" % (self, msg),
235+            effective_happiness = servers_with_shares(
236+                                                   self.preexisting_shares,
237+                                                   self.use_peers)
238+            if self.servers_of_happiness <= len(effective_happiness):
239+                msg = ("placed all %d shares, "
240+                       "sent %d queries to %d peers, "
241+                       "%d queries placed some shares, %d placed none, "
242+                       "got %d errors" %
243+                       (self.total_shares,
244+                        self.query_count, self.num_peers_contacted,
245+                        self.good_query_count, self.bad_query_count,
246+                        self.error_count))
247+                log.msg("peer selection successful for %s: %s" % (self, msg),
248                     parent=self._log_parent)
249hunk ./src/allmydata/immutable/upload.py 259
250-            return (self.use_peers, self.preexisting_shares)
251+                return (self.use_peers, self.preexisting_shares)
252+            else:
253+                delta = self.servers_of_happiness - len(effective_happiness)
254+                shares = shares_by_server(self.preexisting_shares)
255+                # Each server in shares maps to a set of shares stored on it.
256+                # Since we want to keep at least one share on each server
257+                # that has one (otherwise we'd only be making
258+                # the situation worse by removing distinct servers),
259+                # each server has len(its shares) - 1 to spread around.
260+                shares_to_spread = sum([len(list(sharelist)) - 1
261+                                        for (server, sharelist)
262+                                        in shares.items()])
263+                if delta <= len(self.uncontacted_peers) and \
264+                   shares_to_spread >= delta:
265+                    # Loop through the allocated shares, removing
266+                    items = shares.items()
267+                    while len(self.homeless_shares) < delta:
268+                        servernum, sharelist = items.pop()
269+                        if len(sharelist) > 1:
270+                            share = sharelist.pop()
271+                            self.homeless_shares.append(share)
272+                            del(self.preexisting_shares[share])
273+                            items.append((servernum, sharelist))
274+                    return self._loop()
275+                else:
276+                    raise NotEnoughSharesError("shares could only be placed on %d "
277+                                            "servers (%d were requested)" %
278+                                            (len(effective_happiness),
279+                                             self.servers_of_happiness))
280 
281         if self.uncontacted_peers:
282             peer = self.uncontacted_peers.pop(0)
283hunk ./src/allmydata/immutable/upload.py 336
284         else:
285             # no more peers. If we haven't placed enough shares, we fail.
286             placed_shares = self.total_shares - len(self.homeless_shares)
287-            if placed_shares < self.shares_of_happiness:
288+            effective_happiness = servers_with_shares(
289+                                                   self.preexisting_shares,
290+                                                   self.use_peers)
291+            if len(effective_happiness) < self.servers_of_happiness:
292                 msg = ("placed %d shares out of %d total (%d homeless), "
293hunk ./src/allmydata/immutable/upload.py 341
294-                       "want to place %d, "
295+                       "want to place on %d servers, "
296                        "sent %d queries to %d peers, "
297                        "%d queries placed some shares, %d placed none, "
298                        "got %d errors" %
299hunk ./src/allmydata/immutable/upload.py 347
300                        (self.total_shares - len(self.homeless_shares),
301                         self.total_shares, len(self.homeless_shares),
302-                        self.shares_of_happiness,
303+                        self.servers_of_happiness,
304                         self.query_count, self.num_peers_contacted,
305                         self.good_query_count, self.bad_query_count,
306                         self.error_count))
307hunk ./src/allmydata/immutable/upload.py 394
308                     level=log.NOISY, parent=self._log_parent)
309             progress = False
310             for s in alreadygot:
311+                if self.preexisting_shares.has_key(s):
312+                    old_size = len(servers_with_shares(self.preexisting_shares))
313+                    new_candidate = self.preexisting_shares.copy()
314+                    new_candidate[s] = peer.peerid
315+                    new_size = len(servers_with_shares(new_candidate))
316+                    if old_size >= new_size: continue
317                 self.preexisting_shares[s] = peer.peerid
318                 if s in self.homeless_shares:
319                     self.homeless_shares.remove(s)
320hunk ./src/allmydata/immutable/upload.py 825
321         for peer in used_peers:
322             assert isinstance(peer, PeerTracker)
323         buckets = {}
324+        servermap = already_peers.copy()
325         for peer in used_peers:
326             buckets.update(peer.buckets)
327             for shnum in peer.buckets:
328hunk ./src/allmydata/immutable/upload.py 830
329                 self._peer_trackers[shnum] = peer
330+                servermap[shnum] = peer.peerid
331         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
332hunk ./src/allmydata/immutable/upload.py 832
333-        encoder.set_shareholders(buckets)
334+        encoder.set_shareholders(buckets, servermap)
335 
336     def _encrypted_done(self, verifycap):
337         """ Returns a Deferred that will fire with the UploadResults instance. """
338replace ./src/allmydata/immutable/upload.py [A-Za-z_0-9] _servers_with_shares _servers_with_unique_shares
339replace ./src/allmydata/immutable/upload.py [A-Za-z_0-9] servers_with_shares servers_with_unique_shares
340}
341[Alter Tahoe2PeerSelector to make sure that it recognizes existing shares on readonly servers, fixing an issue in #778
342Kevan Carstensen <kevan@isnotajoke.com>**20091116192805
343 Ignore-this: 15289f4d709e03851ed0587b286fd955
344] {
345hunk ./src/allmydata/immutable/upload.py 117
346         d.addCallback(self._got_reply)
347         return d
348 
349+    def query_allocated(self):
350+        d = self._storageserver.callRemote("get_buckets",
351+                                           self.storage_index)
352+        d.addCallback(self._got_allocate_reply)
353+        return d
354+
355+    def _got_allocate_reply(self, buckets):
356+        return (self.peerid, buckets)
357+
358     def _got_reply(self, (alreadygot, buckets)):
359         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
360         b = {}
361hunk ./src/allmydata/immutable/upload.py 195
362         self._started_second_pass = False
363         self.use_peers = set() # PeerTrackers that have shares assigned to them
364         self.preexisting_shares = {} # sharenum -> peerid holding the share
365+        # We don't try to allocate shares to these servers, since they've
366+        # said that they're incapable of storing shares of the size that
367+        # we'd want to store. We keep them around because they may have
368+        # existing shares for this storage index, which we want to know
369+        # about for accurate servers_of_happiness accounting
370+        self.readonly_peers = []
371 
372         peers = storage_broker.get_servers_for_index(storage_index)
373         if not peers:
374hunk ./src/allmydata/immutable/upload.py 227
375             (peerid, conn) = peer
376             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
377             return v1["maximum-immutable-share-size"]
378-        peers = [peer for peer in peers
379-                 if _get_maxsize(peer) >= allocated_size]
380-        if not peers:
381-            raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
382+        new_peers = [peer for peer in peers
383+                     if _get_maxsize(peer) >= allocated_size]
384+        old_peers = list(set(peers).difference(set(new_peers)))
385+        peers = new_peers
386 
387         # decide upon the renewal/cancel secrets, to include them in the
388         # allocate_buckets query.
389hunk ./src/allmydata/immutable/upload.py 241
390                                                        storage_index)
391         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
392                                                      storage_index)
393-
394-        trackers = [ PeerTracker(peerid, conn,
395-                                 share_size, block_size,
396-                                 num_segments, num_share_hashes,
397-                                 storage_index,
398-                                 bucket_renewal_secret_hash(file_renewal_secret,
399-                                                            peerid),
400-                                 bucket_cancel_secret_hash(file_cancel_secret,
401+        def _make_trackers(peers):
402+           return [ PeerTracker(peerid, conn,
403+                                share_size, block_size,
404+                                num_segments, num_share_hashes,
405+                                storage_index,
406+                                bucket_renewal_secret_hash(file_renewal_secret,
407                                                            peerid),
408hunk ./src/allmydata/immutable/upload.py 248
409-                                 )
410-                     for (peerid, conn) in peers ]
411-        self.uncontacted_peers = trackers
412-
413-        d = defer.maybeDeferred(self._loop)
414+                                bucket_cancel_secret_hash(file_cancel_secret,
415+                                                          peerid))
416+                    for (peerid, conn) in peers]
417+        self.uncontacted_peers = _make_trackers(peers)
418+        self.readonly_peers = _make_trackers(old_peers)
419+        # Talk to the readonly servers to get an idea of what servers
420+        # have what shares (if any) for this storage index
421+        d = defer.maybeDeferred(self._existing_shares)
422+        d.addCallback(lambda ign: self._loop())
423         return d
424 
425hunk ./src/allmydata/immutable/upload.py 259
426+    def _existing_shares(self):
427+        if self.readonly_peers:
428+            peer = self.readonly_peers.pop()
429+            assert isinstance(peer, PeerTracker)
430+            d = peer.query_allocated()
431+            d.addCallback(self._handle_allocate_response)
432+            return d
433+
434+    def _handle_allocate_response(self, (peer, buckets)):
435+        for bucket in buckets:
436+            self.preexisting_shares[bucket] = peer
437+            if self.homeless_shares:
438+                self.homeless_shares.remove(bucket)
439+        return self._existing_shares()
440 
441     def _loop(self):
442         if not self.homeless_shares:
443}
444[Change stray "shares_of_happiness" to "servers_of_happiness"
445Kevan Carstensen <kevan@isnotajoke.com>**20091116212459
446 Ignore-this: 1c971ba8c3c4d2e7ba9f020577b28b73
447] {
448hunk ./docs/architecture.txt 183
449 place a quantity known as "shares of happiness", we'll do the upload anyways.
450 If we cannot place at least this many, the upload is declared a failure.
451 
452-The current defaults use k=3, shares_of_happiness=7, and N=10, meaning that
453+The current defaults use k=3, servers_of_happiness=7, and N=10, meaning that
454 we'll try to place 10 shares, we'll be happy if we can place 7, and we need
455 to get back any 3 to recover the file. This results in a 3.3x expansion
456 factor. In general, you should set N about equal to the number of nodes in
457hunk ./src/allmydata/immutable/upload.py 411
458                 pass
459             else:
460                 # No more peers, so this upload might fail (it depends upon
461-                # whether we've hit shares_of_happiness or not). Log the last
462+                # whether we've hit servers_of_happiness or not). Log the last
463                 # failure we got: if a coding error causes all peers to fail
464                 # in the same way, this allows the common failure to be seen
465                 # by the uploader and should help with debugging
466hunk ./src/allmydata/interfaces.py 809
467 
468 class NotEnoughSharesError(Exception):
469     """Download was unable to get enough shares, or upload was unable to
470-    place 'shares_of_happiness' shares."""
471+    place 'servers_of_happiness' shares."""
472 
473 class NoSharesError(Exception):
474     """Upload or Download was unable to get any shares at all."""
475hunk ./src/allmydata/interfaces.py 1308
476                          pushed.
477 
478         'share_counts': return a tuple describing how many shares are used:
479-                        (needed_shares, shares_of_happiness, total_shares)
480+                        (needed_shares, servers_of_happiness, total_shares)
481 
482         'num_segments': return an int with the number of segments that
483                         will be encoded.
484hunk ./src/allmydata/test/test_encode.py 768
485     def test_lost_one_shareholder(self):
486         # we have enough shareholders when we start, but one segment in we
487         # lose one of them. The upload should still succeed, as long as we
488-        # still have 'shares_of_happiness' peers left.
489+        # still have 'servers_of_happiness' peers left.
490         modemap = dict([(i, "good") for i in range(9)] +
491                        [(i, "lost") for i in range(9, 10)])
492         return self.send_and_recover((4,8,10), bucket_modes=modemap)
493hunk ./src/allmydata/test/test_encode.py 776
494     def test_lost_one_shareholder_early(self):
495         # we have enough shareholders when we choose peers, but just before
496         # we send the 'start' message, we lose one of them. The upload should
497-        # still succeed, as long as we still have 'shares_of_happiness' peers
498+        # still succeed, as long as we still have 'servers_of_happiness' peers
499         # left.
500         modemap = dict([(i, "good") for i in range(9)] +
501                        [(i, "lost-early") for i in range(9, 10)])
502}
503[Eliminate overcounting iof servers_of_happiness in Tahoe2PeerSelector; also reorganize some things.
504Kevan Carstensen <kevan@isnotajoke.com>**20091118014542
505 Ignore-this: a6cb032cbff74f4f9d4238faebd99868
506] {
507hunk ./src/allmydata/immutable/upload.py 141
508         return (alreadygot, set(b.keys()))
509 
510 def servers_with_unique_shares(existing_shares, used_peers=None):
511+    """
512+    I accept a dict of shareid -> peerid mappings (and optionally a list
513+    of PeerTracker instances) and return a list of servers that have shares.
514+    """
515     servers = []
516hunk ./src/allmydata/immutable/upload.py 146
517+    existing_shares = existing_shares.copy()
518     if used_peers:
519hunk ./src/allmydata/immutable/upload.py 148
520+        peerdict = {}
521+        for peer in used_peers:
522+            peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
523+        for k in peerdict.keys():
524+            if existing_shares.has_key(k):
525+                # Prevent overcounting; favor the bucket, and not the
526+                # prexisting share.
527+                del(existing_shares[k])
528         peers = list(used_peers.copy())
529         # We do this because the preexisting shares list goes by peerid.
530         peers = [x.peerid for x in peers]
531hunk ./src/allmydata/immutable/upload.py 164
532     return list(set(servers))
533 
534 def shares_by_server(existing_shares):
535+    """
536+    I accept a dict of shareid -> peerid mappings, and return a dict
537+    of peerid -> shareid mappings
538+    """
539     servers = {}
540     for server in set(existing_shares.values()):
541         servers[server] = set([x for x in existing_shares.keys()
542hunk ./src/allmydata/immutable/upload.py 174
543                                if existing_shares[x] == server])
544     return servers
545 
546+def should_add_server(existing_shares, server, bucket):
547+    """
548+    I tell my caller whether the servers_of_happiness number will be
549+    increased or decreased if a particular server is added as the peer
550+    already holding a particular share. I take a dictionary, a peerid,
551+    and a bucket as arguments, and return a boolean.
552+    """
553+    old_size = len(servers_with_unique_shares(existing_shares))
554+    new_candidate = existing_shares.copy()
555+    new_candidate[bucket] = server
556+    new_size = len(servers_with_unique_shares(new_candidate))
557+    return old_size < new_size
558+
559 class Tahoe2PeerSelector:
560 
561     def __init__(self, upload_id, logparent=None, upload_status=None):
562hunk ./src/allmydata/immutable/upload.py 294
563             peer = self.readonly_peers.pop()
564             assert isinstance(peer, PeerTracker)
565             d = peer.query_allocated()
566-            d.addCallback(self._handle_allocate_response)
567+            d.addCallback(self._handle_existing_response)
568             return d
569 
570hunk ./src/allmydata/immutable/upload.py 297
571-    def _handle_allocate_response(self, (peer, buckets)):
572+    def _handle_existing_response(self, (peer, buckets)):
573         for bucket in buckets:
574hunk ./src/allmydata/immutable/upload.py 299
575-            self.preexisting_shares[bucket] = peer
576-            if self.homeless_shares:
577-                self.homeless_shares.remove(bucket)
578+            if should_add_server(self.preexisting_shares, peer, bucket):
579+                self.preexisting_shares[bucket] = peer
580+                if self.homeless_shares and bucket in self.homeless_shares:
581+                    self.homeless_shares.remove(bucket)
582         return self._existing_shares()
583 
584     def _loop(self):
585hunk ./src/allmydata/immutable/upload.py 346
586                             items.append((servernum, sharelist))
587                     return self._loop()
588                 else:
589-                    raise NotEnoughSharesError("shares could only be placed on %d "
590-                                            "servers (%d were requested)" %
591-                                            (len(effective_happiness),
592-                                             self.servers_of_happiness))
593+                    raise NotEnoughSharesError("shares could only be placed "
594+                                   "on %d servers (%d were requested)" %
595+                                   (len(effective_happiness),
596+                                   self.servers_of_happiness))
597 
598         if self.uncontacted_peers:
599             peer = self.uncontacted_peers.pop(0)
600hunk ./src/allmydata/immutable/upload.py 425
601                 # we placed enough to be happy, so we're done
602                 if self._status:
603                     self._status.set_status("Placed all shares")
604-                return self.use_peers
605+                return (self.use_peers, self.preexisting_shares)
606 
607     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
608         if isinstance(res, failure.Failure):
609hunk ./src/allmydata/immutable/upload.py 456
610                     level=log.NOISY, parent=self._log_parent)
611             progress = False
612             for s in alreadygot:
613-                if self.preexisting_shares.has_key(s):
614-                    old_size = len(servers_with_unique_shares(self.preexisting_shares))
615-                    new_candidate = self.preexisting_shares.copy()
616-                    new_candidate[s] = peer.peerid
617-                    new_size = len(servers_with_unique_shares(new_candidate))
618-                    if old_size >= new_size: continue
619-                self.preexisting_shares[s] = peer.peerid
620-                if s in self.homeless_shares:
621-                    self.homeless_shares.remove(s)
622-                    progress = True
623+                if should_add_server(self.preexisting_shares,
624+                                     peer.peerid, s):
625+                    self.preexisting_shares[s] = peer.peerid
626+                    if s in self.homeless_shares:
627+                        self.homeless_shares.remove(s)
628+                        progress = True
629 
630             # the PeerTracker will remember which shares were allocated on
631             # that peer. We just have to remember to use them.
632}
633[Alter the error message returned when peer selection fails
634Kevan Carstensen <kevan@isnotajoke.com>**20091123002405
635 Ignore-this: b2a7dc163edcab8d9613bfd6907e5166
636 
637 The Tahoe2PeerSelector returned either NoSharesError or NotEnoughSharesError
638 for a variety of error conditions that weren't informatively described by them.
639 This patch creates a new error, UploadHappinessError, replaces uses of
640 NoSharesError and NotEnoughSharesError with it, and alters the error message
641 raised with the errors to be more in line with the new servers_of_happiness
642 behavior. See ticket #834 for more information.
643] {
644hunk ./src/allmydata/immutable/encode.py 14
645 from allmydata.util.assertutil import _assert, precondition
646 from allmydata.codec import CRSEncoder
647 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
648-     IEncryptedUploadable, IUploadStatus, NotEnoughSharesError, NoSharesError
649+     IEncryptedUploadable, IUploadStatus, UploadHappinessError
650+
651 
652 """
653 The goal of the encoder is to turn the original file into a series of
654hunk ./src/allmydata/immutable/encode.py 498
655             msg = "lost too many servers during upload (still have %d, want %d): %s" % \
656                   (len(servers_left),
657                    self.servers_of_happiness, why)
658-            if servers_left:
659-                raise NotEnoughSharesError(msg)
660-            else:
661-                raise NoSharesError(msg)
662+            raise UploadHappinessError(msg)
663         self.log("but we can still continue with %s shares, we'll be happy "
664                  "with at least %s" % (len(servers_left),
665                                        self.servers_of_happiness),
666hunk ./src/allmydata/immutable/encode.py 508
667         d = defer.DeferredList(dl, fireOnOneErrback=True)
668         def _eatNotEnoughSharesError(f):
669             # all exceptions that occur while talking to a peer are handled
670-            # in _remove_shareholder. That might raise NotEnoughSharesError,
671+            # in _remove_shareholder. That might raise UploadHappinessError,
672             # which will cause the DeferredList to errback but which should
673hunk ./src/allmydata/immutable/encode.py 510
674-            # otherwise be consumed. Allow non-NotEnoughSharesError exceptions
675+            # otherwise be consumed. Allow non-UploadHappinessError exceptions
676             # to pass through as an unhandled errback. We use this in lieu of
677             # consumeErrors=True to allow coding errors to be logged.
678hunk ./src/allmydata/immutable/encode.py 513
679-            f.trap(NotEnoughSharesError, NoSharesError)
680+            f.trap(UploadHappinessError)
681             return None
682         for d0 in dl:
683             d0.addErrback(_eatNotEnoughSharesError)
684hunk ./src/allmydata/immutable/upload.py 20
685 from allmydata.util.rrefutil import add_version_to_remote_reference
686 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
687      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
688-     NotEnoughSharesError, NoSharesError, NoServersError, \
689-     InsufficientVersionError
690+     NoServersError, InsufficientVersionError, UploadHappinessError
691 from allmydata.immutable import layout
692 from pycryptopp.cipher.aes import AES
693 
694hunk ./src/allmydata/immutable/upload.py 119
695     def query_allocated(self):
696         d = self._storageserver.callRemote("get_buckets",
697                                            self.storage_index)
698-        d.addCallback(self._got_allocate_reply)
699         return d
700 
701hunk ./src/allmydata/immutable/upload.py 121
702-    def _got_allocate_reply(self, buckets):
703-        return (self.peerid, buckets)
704-
705     def _got_reply(self, (alreadygot, buckets)):
706         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
707         b = {}
708hunk ./src/allmydata/immutable/upload.py 187
709     def __init__(self, upload_id, logparent=None, upload_status=None):
710         self.upload_id = upload_id
711         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
712+        # Peers that are working normally, but full.
713+        self.full_count = 0
714         self.error_count = 0
715         self.num_peers_contacted = 0
716         self.last_failure_msg = None
717hunk ./src/allmydata/immutable/upload.py 291
718             peer = self.readonly_peers.pop()
719             assert isinstance(peer, PeerTracker)
720             d = peer.query_allocated()
721-            d.addCallback(self._handle_existing_response)
722+            d.addBoth(self._handle_existing_response, peer.peerid)
723+            self.num_peers_contacted += 1
724+            self.query_count += 1
725+            log.msg("asking peer %s for any existing shares for upload id %s"
726+                    % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
727+                    level=log.NOISY, parent=self._log_parent)
728+            if self._status:
729+                self._status.set_status("Contacting Peer %s to find "
730+                                        "any existing shares"
731+                                        % idlib.shortnodeid_b2a(peer.peerid))
732             return d
733 
734hunk ./src/allmydata/immutable/upload.py 303
735-    def _handle_existing_response(self, (peer, buckets)):
736-        for bucket in buckets:
737-            if should_add_server(self.preexisting_shares, peer, bucket):
738-                self.preexisting_shares[bucket] = peer
739-                if self.homeless_shares and bucket in self.homeless_shares:
740-                    self.homeless_shares.remove(bucket)
741+    def _handle_existing_response(self, res, peer):
742+        if isinstance(res, failure.Failure):
743+            log.msg("%s got error during existing shares check: %s"
744+                    % (idlib.shortnodeid_b2a(peer), res),
745+                    level=log.UNUSUAL, parent=self._log_parent)
746+            self.error_count += 1
747+            self.bad_query_count += 1
748+        else:
749+            buckets = res
750+            log.msg("response from peer %s: alreadygot=%s"
751+                    % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
752+                    level=log.NOISY, parent=self._log_parent)
753+            for bucket in buckets:
754+                if should_add_server(self.preexisting_shares, peer, bucket):
755+                    self.preexisting_shares[bucket] = peer
756+                    if self.homeless_shares and bucket in self.homeless_shares:
757+                        self.homeless_shares.remove(bucket)
758+            self.full_count += 1
759+            self.bad_query_count += 1
760         return self._existing_shares()
761 
762     def _loop(self):
763hunk ./src/allmydata/immutable/upload.py 365
764                             items.append((servernum, sharelist))
765                     return self._loop()
766                 else:
767-                    raise NotEnoughSharesError("shares could only be placed "
768+                    raise UploadHappinessError("shares could only be placed "
769                                    "on %d servers (%d were requested)" %
770                                    (len(effective_happiness),
771                                    self.servers_of_happiness))
772hunk ./src/allmydata/immutable/upload.py 424
773                 msg = ("placed %d shares out of %d total (%d homeless), "
774                        "want to place on %d servers, "
775                        "sent %d queries to %d peers, "
776-                       "%d queries placed some shares, %d placed none, "
777-                       "got %d errors" %
778+                       "%d queries placed some shares, %d placed none "
779+                       "(of which %d placed none due to the server being"
780+                       " full and %d placed none due to an error)" %
781                        (self.total_shares - len(self.homeless_shares),
782                         self.total_shares, len(self.homeless_shares),
783                         self.servers_of_happiness,
784hunk ./src/allmydata/immutable/upload.py 432
785                         self.query_count, self.num_peers_contacted,
786                         self.good_query_count, self.bad_query_count,
787-                        self.error_count))
788+                        self.full_count, self.error_count))
789                 msg = "peer selection failed for %s: %s" % (self, msg)
790                 if self.last_failure_msg:
791                     msg += " (%s)" % (self.last_failure_msg,)
792hunk ./src/allmydata/immutable/upload.py 437
793                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
794-                if placed_shares:
795-                    raise NotEnoughSharesError(msg)
796-                else:
797-                    raise NoSharesError(msg)
798+                raise UploadHappinessError(msg)
799             else:
800                 # we placed enough to be happy, so we're done
801                 if self._status:
802hunk ./src/allmydata/immutable/upload.py 451
803             log.msg("%s got error during peer selection: %s" % (peer, res),
804                     level=log.UNUSUAL, parent=self._log_parent)
805             self.error_count += 1
806+            self.bad_query_count += 1
807             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
808             if (self.uncontacted_peers
809                 or self.contacted_peers
810hunk ./src/allmydata/immutable/upload.py 479
811                     self.preexisting_shares[s] = peer.peerid
812                     if s in self.homeless_shares:
813                         self.homeless_shares.remove(s)
814-                        progress = True
815 
816             # the PeerTracker will remember which shares were allocated on
817             # that peer. We just have to remember to use them.
818hunk ./src/allmydata/immutable/upload.py 495
819                 self.good_query_count += 1
820             else:
821                 self.bad_query_count += 1
822+                self.full_count += 1
823 
824             if still_homeless:
825                 # In networks with lots of space, this is very unusual and
826hunk ./src/allmydata/interfaces.py 808
827         """
828 
829 class NotEnoughSharesError(Exception):
830-    """Download was unable to get enough shares, or upload was unable to
831-    place 'servers_of_happiness' shares."""
832+    """Download was unable to get enough shares"""
833 
834 class NoSharesError(Exception):
835hunk ./src/allmydata/interfaces.py 811
836-    """Upload or Download was unable to get any shares at all."""
837+    """Download was unable to get any shares at all."""
838+
839+class UploadHappinessError(Exception):
840+    """Upload was unable to satisfy 'servers_of_happiness'"""
841 
842 class UnableToFetchCriticalDownloadDataError(Exception):
843     """I was unable to fetch some piece of critical data which is supposed to
844}
845[Change "UploadHappinessError" to "UploadUnhappinessError"
846Kevan Carstensen <kevan@isnotajoke.com>**20091205043037
847 Ignore-this: 236b64ab19836854af4993bb5c1b221a
848] {
849replace ./src/allmydata/immutable/encode.py [A-Za-z_0-9] UploadHappinessError UploadUnhappinessError
850replace ./src/allmydata/immutable/upload.py [A-Za-z_0-9] UploadHappinessError UploadUnhappinessError
851replace ./src/allmydata/interfaces.py [A-Za-z_0-9] UploadHappinessError UploadUnhappinessError
852}
853[Alter the error message when an upload fails, per some comments in #778.
854Kevan Carstensen <kevan@isnotajoke.com>**20091230210344
855 Ignore-this: ba97422b2f9737c46abeb828727beb1
856 
857 When I first implemented #778, I just altered the error messages to refer to
858 servers where they referred to shares. The resulting error messages weren't
859 very good. These are a bit better.
860] {
861hunk ./src/allmydata/immutable/upload.py 200
862 
863     def get_shareholders(self, storage_broker, secret_holder,
864                          storage_index, share_size, block_size,
865-                         num_segments, total_shares, servers_of_happiness):
866+                         num_segments, total_shares, needed_shares,
867+                         servers_of_happiness):
868         """
869         @return: (used_peers, already_peers), where used_peers is a set of
870                  PeerTracker instances that have agreed to hold some shares
871hunk ./src/allmydata/immutable/upload.py 215
872 
873         self.total_shares = total_shares
874         self.servers_of_happiness = servers_of_happiness
875+        self.needed_shares = needed_shares
876 
877         self.homeless_shares = range(total_shares)
878         # self.uncontacted_peers = list() # peers we haven't asked yet
879hunk ./src/allmydata/immutable/upload.py 230
880         # existing shares for this storage index, which we want to know
881         # about for accurate servers_of_happiness accounting
882         self.readonly_peers = []
883+        # These peers have shares -- any shares -- for our SI. We keep track
884+        # of these to write an error message with them later.
885+        self.peers_with_shares = []
886 
887         peers = storage_broker.get_servers_for_index(storage_index)
888         if not peers:
889hunk ./src/allmydata/immutable/upload.py 317
890             self.bad_query_count += 1
891         else:
892             buckets = res
893+            if buckets:
894+                self.peers_with_shares.append(peer)
895             log.msg("response from peer %s: alreadygot=%s"
896                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
897                     level=log.NOISY, parent=self._log_parent)
898hunk ./src/allmydata/immutable/upload.py 331
899             self.bad_query_count += 1
900         return self._existing_shares()
901 
902+    def _get_progress_message(self):
903+        if not self.homeless_shares:
904+            msg = "placed all %d shares, " % (self.total_shares)
905+        else:
906+            msg = ("placed %d shares out of %d total (%d homeless), " %
907+                   (self.total_shares - len(self.homeless_shares),
908+                    self.total_shares,
909+                    len(self.homeless_shares)))
910+        return (msg + "want to place shares on at least %d servers such that "
911+                      "any %d of them have enough shares to recover the file, "
912+                      "sent %d queries to %d peers, "
913+                      "%d queries placed some shares, %d placed none "
914+                      "(of which %d placed none due to the server being"
915+                      " full and %d placed none due to an error)" %
916+                        (self.servers_of_happiness, self.needed_shares,
917+                         self.query_count, self.num_peers_contacted,
918+                         self.good_query_count, self.bad_query_count,
919+                         self.full_count, self.error_count))
920+
921+
922     def _loop(self):
923         if not self.homeless_shares:
924             effective_happiness = servers_with_unique_shares(
925hunk ./src/allmydata/immutable/upload.py 357
926                                                    self.preexisting_shares,
927                                                    self.use_peers)
928             if self.servers_of_happiness <= len(effective_happiness):
929-                msg = ("placed all %d shares, "
930-                       "sent %d queries to %d peers, "
931-                       "%d queries placed some shares, %d placed none, "
932-                       "got %d errors" %
933-                       (self.total_shares,
934-                        self.query_count, self.num_peers_contacted,
935-                        self.good_query_count, self.bad_query_count,
936-                        self.error_count))
937-                log.msg("peer selection successful for %s: %s" % (self, msg),
938-                    parent=self._log_parent)
939+                msg = ("peer selection successful for %s: %s" % (self,
940+                            self._get_progress_message()))
941+                log.msg(msg, parent=self._log_parent)
942                 return (self.use_peers, self.preexisting_shares)
943             else:
944                 delta = self.servers_of_happiness - len(effective_happiness)
945hunk ./src/allmydata/immutable/upload.py 375
946                 if delta <= len(self.uncontacted_peers) and \
947                    shares_to_spread >= delta:
948                     # Loop through the allocated shares, removing
949+                    # one from each server that has more than one and putting
950+                    # it back into self.homeless_shares until we've done
951+                    # this delta times.
952                     items = shares.items()
953                     while len(self.homeless_shares) < delta:
954                         servernum, sharelist = items.pop()
955hunk ./src/allmydata/immutable/upload.py 388
956                             items.append((servernum, sharelist))
957                     return self._loop()
958                 else:
959-                    raise UploadUnhappinessError("shares could only be placed "
960-                                   "on %d servers (%d were requested)" %
961-                                   (len(effective_happiness),
962-                                   self.servers_of_happiness))
963+                    peer_count = len(list(set(self.peers_with_shares)))
964+                    # If peer_count < needed_shares, then the second error
965+                    # message is nonsensical, so we use this one.
966+                    if peer_count < self.needed_shares:
967+                        msg = ("shares could only be placed or found on %d "
968+                               "server(s). "
969+                               "We were asked to place shares on at least %d "
970+                               "server(s) such that any %d of them have "
971+                               "enough shares to recover the file." %
972+                               (peer_count,
973+                                self.servers_of_happiness,
974+                                self.needed_shares))
975+                    # Otherwise, if we've placed on at least needed_shares
976+                    # peers, but there isn't an x-happy subset of those peers
977+                    # for x < needed_shares, we use this error message.
978+                    elif len(effective_happiness) < self.needed_shares:
979+                        msg = ("shares could be placed or found on %d "
980+                               "server(s), but they are not spread out evenly "
981+                               "enough to ensure that any %d of these servers "
982+                               "would have enough shares to recover the file. "
983+                               "We were asked to place "
984+                               "shares on at least %d servers such that any "
985+                               "%d of them have enough shares to recover the "
986+                               "file." %
987+                               (peer_count,
988+                                self.needed_shares,
989+                                self.servers_of_happiness,
990+                                self.needed_shares))
991+                    # Otherwise, if there is an x-happy subset of peers where
992+                    # x >= needed_shares, but x < shares_of_happiness, then
993+                    # we use this message.
994+                    else:
995+                        msg = ("shares could only be placed on %d server(s) "
996+                               "such that any %d of them have enough shares "
997+                               "to recover the file, but we were asked to use "
998+                               "at least %d such servers." %
999+                                               (len(effective_happiness),
1000+                                                self.needed_shares,
1001+                                                self.servers_of_happiness))
1002+                    raise UploadUnhappinessError(msg)
1003 
1004         if self.uncontacted_peers:
1005             peer = self.uncontacted_peers.pop(0)
1006hunk ./src/allmydata/immutable/upload.py 480
1007                                                    self.preexisting_shares,
1008                                                    self.use_peers)
1009             if len(effective_happiness) < self.servers_of_happiness:
1010-                msg = ("placed %d shares out of %d total (%d homeless), "
1011-                       "want to place on %d servers, "
1012-                       "sent %d queries to %d peers, "
1013-                       "%d queries placed some shares, %d placed none "
1014-                       "(of which %d placed none due to the server being"
1015-                       " full and %d placed none due to an error)" %
1016-                       (self.total_shares - len(self.homeless_shares),
1017-                        self.total_shares, len(self.homeless_shares),
1018-                        self.servers_of_happiness,
1019-                        self.query_count, self.num_peers_contacted,
1020-                        self.good_query_count, self.bad_query_count,
1021-                        self.full_count, self.error_count))
1022-                msg = "peer selection failed for %s: %s" % (self, msg)
1023+                msg = ("peer selection failed for %s: %s" % (self,
1024+                                self._get_progress_message()))
1025                 if self.last_failure_msg:
1026                     msg += " (%s)" % (self.last_failure_msg,)
1027                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
1028hunk ./src/allmydata/immutable/upload.py 534
1029                 self.use_peers.add(peer)
1030                 progress = True
1031 
1032+            if allocated or alreadygot:
1033+                self.peers_with_shares.append(peer.peerid)
1034+
1035             not_yet_present = set(shares_to_ask) - set(alreadygot)
1036             still_homeless = not_yet_present - set(allocated)
1037 
1038hunk ./src/allmydata/immutable/upload.py 931
1039         d = peer_selector.get_shareholders(storage_broker, secret_holder,
1040                                            storage_index,
1041                                            share_size, block_size,
1042-                                           num_segments, n, desired)
1043+                                           num_segments, n, k, desired)
1044         def _done(res):
1045             self._peer_selection_elapsed = time.time() - peer_selection_started
1046             return res
1047}
1048[Fix up the behavior of #778, per reviewers' comments
1049Kevan Carstensen <kevan@isnotajoke.com>**20100514004917
1050 Ignore-this: 9c20b60716125278b5456e8feb396bff
1051 
1052   - Make some important utility functions clearer and more thoroughly
1053     documented.
1054   - Assert in upload.servers_of_happiness that the buckets attributes
1055     of PeerTrackers passed to it are mutually disjoint.
1056   - Get rid of some silly non-Pythonisms that I didn't see when I first
1057     wrote these patches.
1058   - Make sure that should_add_server returns true when queried about a
1059     shnum that it doesn't know about yet.
1060   - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set
1061     of peerids, alter dependencies to deal with that.
1062   - Remove upload.should_add_servers, because it is no longer necessary
1063   - Move upload.shares_of_happiness and upload.shares_by_server to a utility
1064     file.
1065   - Change some points in Tahoe2PeerSelector.
1066   - Compute servers_of_happiness using a bipartite matching algorithm that
1067     we know is optimal instead of an ad-hoc greedy algorithm that isn't.
1068   - Change servers_of_happiness to just take a sharemap as an argument,
1069     change its callers to merge existing_shares and used_peers before
1070     calling it.
1071   - Change an error message in the encoder to be more appropriate for
1072     servers of happiness.
1073   - Clarify the wording of an error message in immutable/upload.py
1074   - Refactor a happiness failure message to happinessutil.py, and make
1075     immutable/upload.py and immutable/encode.py use it.
1076   - Move the word "only" as far to the right as possible in failure
1077     messages.
1078   - Use a better definition of progress during peer selection.
1079   - Do read-only peer share detection queries in parallel, not sequentially.
1080   - Clean up logging semantics; print the query statistics whenever an
1081     upload is unsuccessful, not just in one case.
1082 
1083] {
1084hunk ./src/allmydata/immutable/encode.py 10
1085 from allmydata import uri
1086 from allmydata.storage.server import si_b2a
1087 from allmydata.hashtree import HashTree
1088-from allmydata.util import mathutil, hashutil, base32, log
1089+from allmydata.util import mathutil, hashutil, base32, log, happinessutil
1090 from allmydata.util.assertutil import _assert, precondition
1091 from allmydata.codec import CRSEncoder
1092 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
1093hunk ./src/allmydata/immutable/encode.py 201
1094             assert IStorageBucketWriter.providedBy(landlords[k])
1095         self.landlords = landlords.copy()
1096         assert isinstance(servermap, dict)
1097+        for v in servermap.itervalues():
1098+            assert isinstance(v, set)
1099         self.servermap = servermap.copy()
1100 
1101     def start(self):
1102hunk ./src/allmydata/immutable/encode.py 489
1103                       level=log.UNUSUAL, failure=why)
1104         if shareid in self.landlords:
1105             self.landlords[shareid].abort()
1106+            peerid = self.landlords[shareid].get_peerid()
1107+            assert peerid
1108             del self.landlords[shareid]
1109hunk ./src/allmydata/immutable/encode.py 492
1110+            self.servermap[shareid].remove(peerid)
1111+            if not self.servermap[shareid]:
1112+                del self.servermap[shareid]
1113         else:
1114             # even more UNUSUAL
1115             self.log("they weren't in our list of landlords", parent=ln,
1116hunk ./src/allmydata/immutable/encode.py 499
1117                      level=log.WEIRD, umid="TQGFRw")
1118-        del(self.servermap[shareid])
1119-        servers_left = list(set(self.servermap.values()))
1120-        if len(servers_left) < self.servers_of_happiness:
1121-            msg = "lost too many servers during upload (still have %d, want %d): %s" % \
1122-                  (len(servers_left),
1123-                   self.servers_of_happiness, why)
1124+        happiness = happinessutil.servers_of_happiness(self.servermap)
1125+        if happiness < self.servers_of_happiness:
1126+            peerids = set(happinessutil.shares_by_server(self.servermap).keys())
1127+            msg = happinessutil.failure_message(len(peerids),
1128+                                                self.required_shares,
1129+                                                self.servers_of_happiness,
1130+                                                happiness)
1131+            msg = "%s: %s" % (msg, why)
1132             raise UploadUnhappinessError(msg)
1133         self.log("but we can still continue with %s shares, we'll be happy "
1134hunk ./src/allmydata/immutable/encode.py 509
1135-                 "with at least %s" % (len(servers_left),
1136+                 "with at least %s" % (happiness,
1137                                        self.servers_of_happiness),
1138                  parent=ln)
1139 
1140hunk ./src/allmydata/immutable/encode.py 515
1141     def _gather_responses(self, dl):
1142         d = defer.DeferredList(dl, fireOnOneErrback=True)
1143-        def _eatNotEnoughSharesError(f):
1144+        def _eatUploadUnhappinessError(f):
1145             # all exceptions that occur while talking to a peer are handled
1146             # in _remove_shareholder. That might raise UploadUnhappinessError,
1147             # which will cause the DeferredList to errback but which should
1148hunk ./src/allmydata/immutable/encode.py 525
1149             f.trap(UploadUnhappinessError)
1150             return None
1151         for d0 in dl:
1152-            d0.addErrback(_eatNotEnoughSharesError)
1153+            d0.addErrback(_eatUploadUnhappinessError)
1154         return d
1155 
1156     def finish_hashing(self):
1157hunk ./src/allmydata/immutable/layout.py 245
1158     def abort(self):
1159         return self._rref.callRemoteOnly("abort")
1160 
1161+
1162+    def get_peerid(self):
1163+        if self._nodeid:
1164+            return self._nodeid
1165+        return None
1166+
1167 class WriteBucketProxy_v2(WriteBucketProxy):
1168     fieldsize = 8
1169     fieldstruct = ">Q"
1170hunk ./src/allmydata/immutable/upload.py 16
1171 from allmydata.storage.server import si_b2a
1172 from allmydata.immutable import encode
1173 from allmydata.util import base32, dictutil, idlib, log, mathutil
1174+from allmydata.util.happinessutil import servers_of_happiness, \
1175+                                         shares_by_server, merge_peers, \
1176+                                         failure_message
1177 from allmydata.util.assertutil import precondition
1178 from allmydata.util.rrefutil import add_version_to_remote_reference
1179 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
1180hunk ./src/allmydata/immutable/upload.py 119
1181         d.addCallback(self._got_reply)
1182         return d
1183 
1184-    def query_allocated(self):
1185-        d = self._storageserver.callRemote("get_buckets",
1186-                                           self.storage_index)
1187-        return d
1188+    def ask_about_existing_shares(self):
1189+        return self._storageserver.callRemote("get_buckets",
1190+                                              self.storage_index)
1191 
1192     def _got_reply(self, (alreadygot, buckets)):
1193         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
1194hunk ./src/allmydata/immutable/upload.py 137
1195         self.buckets.update(b)
1196         return (alreadygot, set(b.keys()))
1197 
1198-def servers_with_unique_shares(existing_shares, used_peers=None):
1199-    """
1200-    I accept a dict of shareid -> peerid mappings (and optionally a list
1201-    of PeerTracker instances) and return a list of servers that have shares.
1202-    """
1203-    servers = []
1204-    existing_shares = existing_shares.copy()
1205-    if used_peers:
1206-        peerdict = {}
1207-        for peer in used_peers:
1208-            peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
1209-        for k in peerdict.keys():
1210-            if existing_shares.has_key(k):
1211-                # Prevent overcounting; favor the bucket, and not the
1212-                # prexisting share.
1213-                del(existing_shares[k])
1214-        peers = list(used_peers.copy())
1215-        # We do this because the preexisting shares list goes by peerid.
1216-        peers = [x.peerid for x in peers]
1217-        servers.extend(peers)
1218-    servers.extend(existing_shares.values())
1219-    return list(set(servers))
1220-
1221-def shares_by_server(existing_shares):
1222-    """
1223-    I accept a dict of shareid -> peerid mappings, and return a dict
1224-    of peerid -> shareid mappings
1225-    """
1226-    servers = {}
1227-    for server in set(existing_shares.values()):
1228-        servers[server] = set([x for x in existing_shares.keys()
1229-                               if existing_shares[x] == server])
1230-    return servers
1231-
1232-def should_add_server(existing_shares, server, bucket):
1233-    """
1234-    I tell my caller whether the servers_of_happiness number will be
1235-    increased or decreased if a particular server is added as the peer
1236-    already holding a particular share. I take a dictionary, a peerid,
1237-    and a bucket as arguments, and return a boolean.
1238-    """
1239-    old_size = len(servers_with_unique_shares(existing_shares))
1240-    new_candidate = existing_shares.copy()
1241-    new_candidate[bucket] = server
1242-    new_size = len(servers_with_unique_shares(new_candidate))
1243-    return old_size < new_size
1244 
1245 class Tahoe2PeerSelector:
1246 
1247hunk ./src/allmydata/immutable/upload.py 162
1248         @return: (used_peers, already_peers), where used_peers is a set of
1249                  PeerTracker instances that have agreed to hold some shares
1250                  for us (the shnum is stashed inside the PeerTracker),
1251-                 and already_peers is a dict mapping shnum to a peer
1252-                 which claims to already have the share.
1253+                 and already_peers is a dict mapping shnum to a set of peers
1254+                 which claim to already have the share.
1255         """
1256 
1257         if self._status:
1258hunk ./src/allmydata/immutable/upload.py 174
1259         self.needed_shares = needed_shares
1260 
1261         self.homeless_shares = range(total_shares)
1262-        # self.uncontacted_peers = list() # peers we haven't asked yet
1263         self.contacted_peers = [] # peers worth asking again
1264         self.contacted_peers2 = [] # peers that we have asked again
1265         self._started_second_pass = False
1266hunk ./src/allmydata/immutable/upload.py 178
1267         self.use_peers = set() # PeerTrackers that have shares assigned to them
1268-        self.preexisting_shares = {} # sharenum -> peerid holding the share
1269-        # We don't try to allocate shares to these servers, since they've
1270-        # said that they're incapable of storing shares of the size that
1271-        # we'd want to store. We keep them around because they may have
1272-        # existing shares for this storage index, which we want to know
1273-        # about for accurate servers_of_happiness accounting
1274-        self.readonly_peers = []
1275-        # These peers have shares -- any shares -- for our SI. We keep track
1276-        # of these to write an error message with them later.
1277-        self.peers_with_shares = []
1278+        self.preexisting_shares = {} # shareid => set(peerids) holding shareid
1279+        # We don't try to allocate shares to these servers, since they've said
1280+        # that they're incapable of storing shares of the size that we'd want
1281+        # to store. We keep them around because they may have existing shares
1282+        # for this storage index, which we want to know about for accurate
1283+        # servers_of_happiness accounting
1284+        # (this is eventually a list, but it is initialized later)
1285+        self.readonly_peers = None
1286+        # These peers have shares -- any shares -- for our SI. We keep
1287+        # track of these to write an error message with them later.
1288+        self.peers_with_shares = set()
1289 
1290hunk ./src/allmydata/immutable/upload.py 190
1291-        peers = storage_broker.get_servers_for_index(storage_index)
1292-        if not peers:
1293-            raise NoServersError("client gave us zero peers")
1294-
1295         # this needed_hashes computation should mirror
1296         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
1297         # (instead of a HashTree) because we don't require actual hashing
1298hunk ./src/allmydata/immutable/upload.py 202
1299                                              num_share_hashes, EXTENSION_SIZE,
1300                                              None)
1301         allocated_size = wbp.get_allocated_size()
1302+        all_peers = storage_broker.get_servers_for_index(storage_index)
1303+        if not all_peers:
1304+            raise NoServersError("client gave us zero peers")
1305 
1306         # filter the list of peers according to which ones can accomodate
1307         # this request. This excludes older peers (which used a 4-byte size
1308hunk ./src/allmydata/immutable/upload.py 214
1309             (peerid, conn) = peer
1310             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
1311             return v1["maximum-immutable-share-size"]
1312-        new_peers = [peer for peer in peers
1313-                     if _get_maxsize(peer) >= allocated_size]
1314-        old_peers = list(set(peers).difference(set(new_peers)))
1315-        peers = new_peers
1316+        writable_peers = [peer for peer in all_peers
1317+                          if _get_maxsize(peer) >= allocated_size]
1318+        readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
1319 
1320         # decide upon the renewal/cancel secrets, to include them in the
1321         # allocate_buckets query.
1322hunk ./src/allmydata/immutable/upload.py 228
1323         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
1324                                                      storage_index)
1325         def _make_trackers(peers):
1326-           return [ PeerTracker(peerid, conn,
1327-                                share_size, block_size,
1328-                                num_segments, num_share_hashes,
1329-                                storage_index,
1330-                                bucket_renewal_secret_hash(file_renewal_secret,
1331-                                                           peerid),
1332-                                bucket_cancel_secret_hash(file_cancel_secret,
1333-                                                          peerid))
1334+           return [PeerTracker(peerid, conn,
1335+                               share_size, block_size,
1336+                               num_segments, num_share_hashes,
1337+                               storage_index,
1338+                               bucket_renewal_secret_hash(file_renewal_secret,
1339+                                                          peerid),
1340+                               bucket_cancel_secret_hash(file_cancel_secret,
1341+                                                         peerid))
1342                     for (peerid, conn) in peers]
1343hunk ./src/allmydata/immutable/upload.py 237
1344-        self.uncontacted_peers = _make_trackers(peers)
1345-        self.readonly_peers = _make_trackers(old_peers)
1346-        # Talk to the readonly servers to get an idea of what servers
1347-        # have what shares (if any) for this storage index
1348-        d = defer.maybeDeferred(self._existing_shares)
1349-        d.addCallback(lambda ign: self._loop())
1350-        return d
1351-
1352-    def _existing_shares(self):
1353-        if self.readonly_peers:
1354-            peer = self.readonly_peers.pop()
1355+        self.uncontacted_peers = _make_trackers(writable_peers)
1356+        self.readonly_peers = _make_trackers(readonly_peers)
1357+        # We now ask peers that can't hold any new shares about existing
1358+        # shares that they might have for our SI. Once this is done, we
1359+        # start placing the shares that we haven't already accounted
1360+        # for.
1361+        ds = []
1362+        if self._status and self.readonly_peers:
1363+            self._status.set_status("Contacting readonly peers to find "
1364+                                    "any existing shares")
1365+        for peer in self.readonly_peers:
1366             assert isinstance(peer, PeerTracker)
1367hunk ./src/allmydata/immutable/upload.py 249
1368-            d = peer.query_allocated()
1369+            d = peer.ask_about_existing_shares()
1370             d.addBoth(self._handle_existing_response, peer.peerid)
1371hunk ./src/allmydata/immutable/upload.py 251
1372+            ds.append(d)
1373             self.num_peers_contacted += 1
1374             self.query_count += 1
1375hunk ./src/allmydata/immutable/upload.py 254
1376-            log.msg("asking peer %s for any existing shares for upload id %s"
1377+            log.msg("asking peer %s for any existing shares for "
1378+                    "upload id %s"
1379                     % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
1380                     level=log.NOISY, parent=self._log_parent)
1381hunk ./src/allmydata/immutable/upload.py 258
1382-            if self._status:
1383-                self._status.set_status("Contacting Peer %s to find "
1384-                                        "any existing shares"
1385-                                        % idlib.shortnodeid_b2a(peer.peerid))
1386-            return d
1387+        dl = defer.DeferredList(ds)
1388+        dl.addCallback(lambda ign: self._loop())
1389+        return dl
1390+
1391 
1392     def _handle_existing_response(self, res, peer):
1393hunk ./src/allmydata/immutable/upload.py 264
1394+        """
1395+        I handle responses to the queries sent by
1396+        Tahoe2PeerSelector._existing_shares.
1397+        """
1398         if isinstance(res, failure.Failure):
1399             log.msg("%s got error during existing shares check: %s"
1400                     % (idlib.shortnodeid_b2a(peer), res),
1401hunk ./src/allmydata/immutable/upload.py 277
1402         else:
1403             buckets = res
1404             if buckets:
1405-                self.peers_with_shares.append(peer)
1406+                self.peers_with_shares.add(peer)
1407             log.msg("response from peer %s: alreadygot=%s"
1408                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
1409                     level=log.NOISY, parent=self._log_parent)
1410hunk ./src/allmydata/immutable/upload.py 282
1411             for bucket in buckets:
1412-                if should_add_server(self.preexisting_shares, peer, bucket):
1413-                    self.preexisting_shares[bucket] = peer
1414-                    if self.homeless_shares and bucket in self.homeless_shares:
1415-                        self.homeless_shares.remove(bucket)
1416+                self.preexisting_shares.setdefault(bucket, set()).add(peer)
1417+                if self.homeless_shares and bucket in self.homeless_shares:
1418+                    self.homeless_shares.remove(bucket)
1419             self.full_count += 1
1420             self.bad_query_count += 1
1421hunk ./src/allmydata/immutable/upload.py 287
1422-        return self._existing_shares()
1423+
1424 
1425     def _get_progress_message(self):
1426         if not self.homeless_shares:
1427hunk ./src/allmydata/immutable/upload.py 311
1428 
1429     def _loop(self):
1430         if not self.homeless_shares:
1431-            effective_happiness = servers_with_unique_shares(
1432-                                                   self.preexisting_shares,
1433-                                                   self.use_peers)
1434-            if self.servers_of_happiness <= len(effective_happiness):
1435+            merged = merge_peers(self.preexisting_shares, self.use_peers)
1436+            effective_happiness = servers_of_happiness(merged)
1437+            if self.servers_of_happiness <= effective_happiness:
1438                 msg = ("peer selection successful for %s: %s" % (self,
1439                             self._get_progress_message()))
1440                 log.msg(msg, parent=self._log_parent)
1441hunk ./src/allmydata/immutable/upload.py 319
1442                 return (self.use_peers, self.preexisting_shares)
1443             else:
1444-                delta = self.servers_of_happiness - len(effective_happiness)
1445+                # We're not okay right now, but maybe we can fix it by
1446+                # redistributing some shares. In cases where one or two
1447+                # servers has, before the upload, all or most of the
1448+                # shares for a given SI, this can work by allowing _loop
1449+                # a chance to spread those out over the other peers,
1450+                delta = self.servers_of_happiness - effective_happiness
1451                 shares = shares_by_server(self.preexisting_shares)
1452                 # Each server in shares maps to a set of shares stored on it.
1453                 # Since we want to keep at least one share on each server
1454hunk ./src/allmydata/immutable/upload.py 336
1455                                         in shares.items()])
1456                 if delta <= len(self.uncontacted_peers) and \
1457                    shares_to_spread >= delta:
1458-                    # Loop through the allocated shares, removing
1459-                    # one from each server that has more than one and putting
1460-                    # it back into self.homeless_shares until we've done
1461-                    # this delta times.
1462                     items = shares.items()
1463                     while len(self.homeless_shares) < delta:
1464hunk ./src/allmydata/immutable/upload.py 338
1465-                        servernum, sharelist = items.pop()
1466+                        # Loop through the allocated shares, removing
1467+                        # one from each server that has more than one
1468+                        # and putting it back into self.homeless_shares
1469+                        # until we've done this delta times.
1470+                        server, sharelist = items.pop()
1471                         if len(sharelist) > 1:
1472                             share = sharelist.pop()
1473                             self.homeless_shares.append(share)
1474hunk ./src/allmydata/immutable/upload.py 346
1475-                            del(self.preexisting_shares[share])
1476-                            items.append((servernum, sharelist))
1477+                            self.preexisting_shares[share].remove(server)
1478+                            if not self.preexisting_shares[share]:
1479+                                del self.preexisting_shares[share]
1480+                            items.append((server, sharelist))
1481                     return self._loop()
1482                 else:
1483hunk ./src/allmydata/immutable/upload.py 352
1484-                    peer_count = len(list(set(self.peers_with_shares)))
1485+                    # Redistribution won't help us; fail.
1486+                    peer_count = len(self.peers_with_shares)
1487                     # If peer_count < needed_shares, then the second error
1488                     # message is nonsensical, so we use this one.
1489hunk ./src/allmydata/immutable/upload.py 356
1490-                    if peer_count < self.needed_shares:
1491-                        msg = ("shares could only be placed or found on %d "
1492-                               "server(s). "
1493-                               "We were asked to place shares on at least %d "
1494-                               "server(s) such that any %d of them have "
1495-                               "enough shares to recover the file." %
1496-                               (peer_count,
1497-                                self.servers_of_happiness,
1498-                                self.needed_shares))
1499-                    # Otherwise, if we've placed on at least needed_shares
1500-                    # peers, but there isn't an x-happy subset of those peers
1501-                    # for x < needed_shares, we use this error message.
1502-                    elif len(effective_happiness) < self.needed_shares:
1503-                        msg = ("shares could be placed or found on %d "
1504-                               "server(s), but they are not spread out evenly "
1505-                               "enough to ensure that any %d of these servers "
1506-                               "would have enough shares to recover the file. "
1507-                               "We were asked to place "
1508-                               "shares on at least %d servers such that any "
1509-                               "%d of them have enough shares to recover the "
1510-                               "file." %
1511-                               (peer_count,
1512-                                self.needed_shares,
1513-                                self.servers_of_happiness,
1514-                                self.needed_shares))
1515-                    # Otherwise, if there is an x-happy subset of peers where
1516-                    # x >= needed_shares, but x < shares_of_happiness, then
1517-                    # we use this message.
1518-                    else:
1519-                        msg = ("shares could only be placed on %d server(s) "
1520-                               "such that any %d of them have enough shares "
1521-                               "to recover the file, but we were asked to use "
1522-                               "at least %d such servers." %
1523-                                               (len(effective_happiness),
1524-                                                self.needed_shares,
1525-                                                self.servers_of_happiness))
1526-                    raise UploadUnhappinessError(msg)
1527+                    msg = failure_message(peer_count,
1528+                                          self.needed_shares,
1529+                                          self.servers_of_happiness,
1530+                                          effective_happiness)
1531+                    raise UploadUnhappinessError("%s (%s)" % (msg,
1532+                                                 self._get_progress_message()))
1533 
1534         if self.uncontacted_peers:
1535             peer = self.uncontacted_peers.pop(0)
1536hunk ./src/allmydata/immutable/upload.py 410
1537         else:
1538             # no more peers. If we haven't placed enough shares, we fail.
1539             placed_shares = self.total_shares - len(self.homeless_shares)
1540-            effective_happiness = servers_with_unique_shares(
1541-                                                   self.preexisting_shares,
1542-                                                   self.use_peers)
1543-            if len(effective_happiness) < self.servers_of_happiness:
1544-                msg = ("peer selection failed for %s: %s" % (self,
1545+            merged = merge_peers(self.preexisting_shares, self.use_peers)
1546+            effective_happiness = servers_of_happiness(merged)
1547+            if effective_happiness < self.servers_of_happiness:
1548+                msg = failure_message(len(self.peers_with_shares),
1549+                                      self.needed_shares,
1550+                                      self.servers_of_happiness,
1551+                                      effective_happiness)
1552+                msg = ("peer selection failed for %s: %s (%s)" % (self,
1553+                                msg,
1554                                 self._get_progress_message()))
1555                 if self.last_failure_msg:
1556                     msg += " (%s)" % (self.last_failure_msg,)
1557hunk ./src/allmydata/immutable/upload.py 460
1558                     level=log.NOISY, parent=self._log_parent)
1559             progress = False
1560             for s in alreadygot:
1561-                if should_add_server(self.preexisting_shares,
1562-                                     peer.peerid, s):
1563-                    self.preexisting_shares[s] = peer.peerid
1564-                    if s in self.homeless_shares:
1565-                        self.homeless_shares.remove(s)
1566+                self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
1567+                if s in self.homeless_shares:
1568+                    self.homeless_shares.remove(s)
1569+                    progress = True
1570+                elif s in shares_to_ask:
1571+                    progress = True
1572 
1573             # the PeerTracker will remember which shares were allocated on
1574             # that peer. We just have to remember to use them.
1575hunk ./src/allmydata/immutable/upload.py 474
1576                 progress = True
1577 
1578             if allocated or alreadygot:
1579-                self.peers_with_shares.append(peer.peerid)
1580+                self.peers_with_shares.add(peer.peerid)
1581 
1582             not_yet_present = set(shares_to_ask) - set(alreadygot)
1583             still_homeless = not_yet_present - set(allocated)
1584hunk ./src/allmydata/immutable/upload.py 480
1585 
1586             if progress:
1587-                # they accepted or already had at least one share, so
1588-                # progress has been made
1589+                # They accepted at least one of the shares that we asked
1590+                # them to accept, or they had a share that we didn't ask
1591+                # them to accept but that we hadn't placed yet, so this
1592+                # was a productive query
1593                 self.good_query_count += 1
1594             else:
1595                 self.bad_query_count += 1
1596hunk ./src/allmydata/immutable/upload.py 882
1597     def set_shareholders(self, (used_peers, already_peers), encoder):
1598         """
1599         @param used_peers: a sequence of PeerTracker objects
1600-        @paran already_peers: a dict mapping sharenum to a peerid that
1601-                              claims to already have this share
1602+        @paran already_peers: a dict mapping sharenum to a set of peerids
1603+                              that claim to already have this share
1604         """
1605         self.log("_send_shares, used_peers is %s" % (used_peers,))
1606         # record already-present shares in self._results
1607hunk ./src/allmydata/immutable/upload.py 898
1608             buckets.update(peer.buckets)
1609             for shnum in peer.buckets:
1610                 self._peer_trackers[shnum] = peer
1611-                servermap[shnum] = peer.peerid
1612+                servermap.setdefault(shnum, set()).add(peer.peerid)
1613         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
1614         encoder.set_shareholders(buckets, servermap)
1615 
1616hunk ./src/allmydata/interfaces.py 1348
1617         must be a dictionary that maps share number (an integer ranging from
1618         0 to n-1) to an instance that provides IStorageBucketWriter.
1619         'servermap' is a dictionary that maps share number (as defined above)
1620-        to a peerid. This must be performed before start() can be called."""
1621+        to a set of peerids. This must be performed before start() can be
1622+        called."""
1623 
1624     def start():
1625         """Begin the encode/upload process. This involves reading encrypted
1626addfile ./src/allmydata/util/happinessutil.py
1627hunk ./src/allmydata/util/happinessutil.py 1
1628+"""
1629+I contain utilities useful for calculating servers_of_happiness, and for
1630+reporting it in messages
1631+"""
1632+
1633+def failure_message(peer_count, k, happy, effective_happy):
1634+    # If peer_count < needed_shares, this error message makes more
1635+    # sense than any of the others, so use it.
1636+    if peer_count < k:
1637+        msg = ("shares could be placed or found on only %d "
1638+               "server(s). "
1639+               "We were asked to place shares on at least %d "
1640+               "server(s) such that any %d of them have "
1641+               "enough shares to recover the file." %
1642+                (peer_count, happy, k))
1643+    # Otherwise, if we've placed on at least needed_shares
1644+    # peers, but there isn't an x-happy subset of those peers
1645+    # for x >= needed_shares, we use this error message.
1646+    elif effective_happy < k:
1647+        msg = ("shares could be placed or found on %d "
1648+               "server(s), but they are not spread out evenly "
1649+               "enough to ensure that any %d of these servers "
1650+               "would have enough shares to recover the file. "
1651+               "We were asked to place "
1652+               "shares on at least %d servers such that any "
1653+               "%d of them have enough shares to recover the "
1654+               "file." %
1655+                (peer_count, k, happy, k))
1656+    # Otherwise, if there is an x-happy subset of peers where
1657+    # x >= needed_shares, but x < servers_of_happiness, then
1658+    # we use this message.
1659+    else:
1660+        msg = ("shares could be placed on only %d server(s) "
1661+               "such that any %d of them have enough shares "
1662+               "to recover the file, but we were asked to "
1663+               "place shares on at least %d such servers." %
1664+                (effective_happy, k, happy))
1665+    return msg
1666+
1667+
1668+def shares_by_server(servermap):
1669+    """
1670+    I accept a dict of shareid -> set(peerid) mappings, and return a
1671+    dict of peerid -> set(shareid) mappings. My argument is a dictionary
1672+    with sets of peers, indexed by shares, and I transform that into a
1673+    dictionary of sets of shares, indexed by peerids.
1674+    """
1675+    ret = {}
1676+    for shareid, peers in servermap.iteritems():
1677+        assert isinstance(peers, set)
1678+        for peerid in peers:
1679+            ret.setdefault(peerid, set()).add(shareid)
1680+    return ret
1681+
1682+def merge_peers(servermap, used_peers=None):
1683+    """
1684+    I accept a dict of shareid -> set(peerid) mappings, and optionally a
1685+    set of PeerTrackers. If no set of PeerTrackers is provided, I return
1686+    my first argument unmodified. Otherwise, I update a copy of my first
1687+    argument to include the shareid -> peerid mappings implied in the
1688+    set of PeerTrackers, returning the resulting dict.
1689+    """
1690+    if not used_peers:
1691+        return servermap
1692+
1693+    assert(isinstance(servermap, dict))
1694+    assert(isinstance(used_peers, set))
1695+
1696+    # Since we mutate servermap, and are called outside of a
1697+    # context where it is okay to do that, make a copy of servermap and
1698+    # work with it.
1699+    servermap = servermap.copy()
1700+    for peer in used_peers:
1701+        for shnum in peer.buckets:
1702+            servermap.setdefault(shnum, set()).add(peer.peerid)
1703+    return servermap
1704+
1705+def servers_of_happiness(sharemap):
1706+    """
1707+    I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I
1708+    return the 'servers_of_happiness' number that sharemap results in.
1709+
1710+    To calculate the 'servers_of_happiness' number for the sharemap, I
1711+    construct a bipartite graph with servers in one partition of vertices
1712+    and shares in the other, and with an edge between a server s and a share t
1713+    if s is to store t. I then compute the size of a maximum matching in
1714+    the resulting graph; this is then returned as the 'servers_of_happiness'
1715+    for my arguments.
1716+
1717+    For example, consider the following layout:
1718+
1719+      server 1: shares 1, 2, 3, 4
1720+      server 2: share 6
1721+      server 3: share 3
1722+      server 4: share 4
1723+      server 5: share 2
1724+
1725+    From this, we can construct the following graph:
1726+
1727+      L = {server 1, server 2, server 3, server 4, server 5}
1728+      R = {share 1, share 2, share 3, share 4, share 6}
1729+      V = L U R
1730+      E = {(server 1, share 1), (server 1, share 2), (server 1, share 3),
1731+           (server 1, share 4), (server 2, share 6), (server 3, share 3),
1732+           (server 4, share 4), (server 5, share 2)}
1733+      G = (V, E)
1734+
1735+    Note that G is bipartite since every edge in e has one endpoint in L
1736+    and one endpoint in R.
1737+
1738+    A matching in a graph G is a subset M of E such that, for any vertex
1739+    v in V, v is incident to at most one edge of M. A maximum matching
1740+    in G is a matching that is no smaller than any other matching. For
1741+    this graph, a matching of cardinality 5 is:
1742+
1743+      M = {(server 1, share 1), (server 2, share 6),
1744+           (server 3, share 3), (server 4, share 4),
1745+           (server 5, share 2)}
1746+
1747+    Since G is bipartite, and since |L| = 5, we cannot have an M' such
1748+    that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and
1749+    as long as k <= 5, we can see that the layout above has
1750+    servers_of_happiness = 5, which matches the results here.
1751+    """
1752+    if sharemap == {}:
1753+        return 0
1754+    sharemap = shares_by_server(sharemap)
1755+    graph = flow_network_for(sharemap)
1756+    # This is an implementation of the Ford-Fulkerson method for finding
1757+    # a maximum flow in a flow network applied to a bipartite graph.
1758+    # Specifically, it is the Edmonds-Karp algorithm, since it uses a
1759+    # BFS to find the shortest augmenting path at each iteration, if one
1760+    # exists.
1761+    #
1762+    # The implementation here is an adapation of an algorithm described in
1763+    # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
1764+    dim = len(graph)
1765+    flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
1766+    residual_graph, residual_function = residual_network(graph, flow_function)
1767+    while augmenting_path_for(residual_graph):
1768+        path = augmenting_path_for(residual_graph)
1769+        # Delta is the largest amount that we can increase flow across
1770+        # all of the edges in path. Because of the way that the residual
1771+        # function is constructed, f[u][v] for a particular edge (u, v)
1772+        # is the amount of unused capacity on that edge. Taking the
1773+        # minimum of a list of those values for each edge in the
1774+        # augmenting path gives us our delta.
1775+        delta = min(map(lambda (u, v): residual_function[u][v], path))
1776+        for (u, v) in path:
1777+            flow_function[u][v] += delta
1778+            flow_function[v][u] -= delta
1779+        residual_graph, residual_function = residual_network(graph,
1780+                                                             flow_function)
1781+    num_servers = len(sharemap)
1782+    # The value of a flow is the total flow out of the source vertex
1783+    # (vertex 0, in our graph). We could just as well sum across all of
1784+    # f[0], but we know that vertex 0 only has edges to the servers in
1785+    # our graph, so we can stop after summing flow across those. The
1786+    # value of a flow computed in this way is the size of a maximum
1787+    # matching on the bipartite graph described above.
1788+    return sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
1789+
1790+def flow_network_for(sharemap):
1791+    """
1792+    I take my argument, a dict of peerid -> set(shareid) mappings, and
1793+    turn it into a flow network suitable for use with Edmonds-Karp. I
1794+    then return the adjacency list representation of that network.
1795+
1796+    Specifically, I build G = (V, E), where:
1797+      V = { peerid in sharemap } U { shareid in sharemap } U {s, t}
1798+      E = {(s, peerid) for each peerid}
1799+          U {(peerid, shareid) if peerid is to store shareid }
1800+          U {(shareid, t) for each shareid}
1801+
1802+    s and t will be source and sink nodes when my caller starts treating
1803+    the graph I return like a flow network. Without s and t, the
1804+    returned graph is bipartite.
1805+    """
1806+    # Servers don't have integral identifiers, and we can't make any
1807+    # assumptions about the way shares are indexed -- it's possible that
1808+    # there are missing shares, for example. So before making a graph,
1809+    # we re-index so that all of our vertices have integral indices, and
1810+    # that there aren't any holes. We start indexing at 1, so that we
1811+    # can add a source node at index 0.
1812+    sharemap, num_shares = reindex(sharemap, base_index=1)
1813+    num_servers = len(sharemap)
1814+    graph = [] # index -> [index], an adjacency list
1815+    # Add an entry at the top (index 0) that has an edge to every server
1816+    # in sharemap
1817+    graph.append(sharemap.keys())
1818+    # For each server, add an entry that has an edge to every share that it
1819+    # contains (or will contain).
1820+    for k in sharemap:
1821+        graph.append(sharemap[k])
1822+    # For each share, add an entry that has an edge to the sink.
1823+    sink_num = num_servers + num_shares + 1
1824+    for i in xrange(num_shares):
1825+        graph.append([sink_num])
1826+    # Add an empty entry for the sink, which has no outbound edges.
1827+    graph.append([])
1828+    return graph
1829+
1830+def reindex(sharemap, base_index):
1831+    """
1832+    Given sharemap, I map peerids and shareids to integers that don't
1833+    conflict with each other, so they're useful as indices in a graph. I
1834+    return a sharemap that is reindexed appropriately, and also the
1835+    number of distinct shares in the resulting sharemap as a convenience
1836+    for my caller. base_index tells me where to start indexing.
1837+    """
1838+    shares  = {} # shareid  -> vertex index
1839+    num = base_index
1840+    ret = {} # peerid -> [shareid], a reindexed sharemap.
1841+    # Number the servers first
1842+    for k in sharemap:
1843+        ret[num] = sharemap[k]
1844+        num += 1
1845+    # Number the shares
1846+    for k in ret:
1847+        for shnum in ret[k]:
1848+            if not shares.has_key(shnum):
1849+                shares[shnum] = num
1850+                num += 1
1851+        ret[k] = map(lambda x: shares[x], ret[k])
1852+    return (ret, len(shares))
1853+
1854+def residual_network(graph, f):
1855+    """
1856+    I return the residual network and residual capacity function of the
1857+    flow network represented by my graph and f arguments. graph is a
1858+    flow network in adjacency-list form, and f is a flow in graph.
1859+    """
1860+    new_graph = [[] for i in xrange(len(graph))]
1861+    cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))]
1862+    for i in xrange(len(graph)):
1863+        for v in graph[i]:
1864+            if f[i][v] == 1:
1865+                # We add an edge (v, i) with cf[v,i] = 1. This means
1866+                # that we can remove 1 unit of flow from the edge (i, v)
1867+                new_graph[v].append(i)
1868+                cf[v][i] = 1
1869+                cf[i][v] = -1
1870+            else:
1871+                # We add the edge (i, v), since we're not using it right
1872+                # now.
1873+                new_graph[i].append(v)
1874+                cf[i][v] = 1
1875+                cf[v][i] = -1
1876+    return (new_graph, cf)
1877+
1878+def augmenting_path_for(graph):
1879+    """
1880+    I return an augmenting path, if there is one, from the source node
1881+    to the sink node in the flow network represented by my graph argument.
1882+    If there is no augmenting path, I return False. I assume that the
1883+    source node is at index 0 of graph, and the sink node is at the last
1884+    index. I also assume that graph is a flow network in adjacency list
1885+    form.
1886+    """
1887+    bfs_tree = bfs(graph, 0)
1888+    if bfs_tree[len(graph) - 1]:
1889+        n = len(graph) - 1
1890+        path = [] # [(u, v)], where u and v are vertices in the graph
1891+        while n != 0:
1892+            path.insert(0, (bfs_tree[n], n))
1893+            n = bfs_tree[n]
1894+        return path
1895+    return False
1896+
1897+def bfs(graph, s):
1898+    """
1899+    Perform a BFS on graph starting at s, where graph is a graph in
1900+    adjacency list form, and s is a node in graph. I return the
1901+    predecessor table that the BFS generates.
1902+    """
1903+    # This is an adaptation of the BFS described in "Introduction to
1904+    # Algorithms", Cormen et al, 2nd ed., p. 532.
1905+    # WHITE vertices are those that we haven't seen or explored yet.
1906+    WHITE = 0
1907+    # GRAY vertices are those we have seen, but haven't explored yet
1908+    GRAY  = 1
1909+    # BLACK vertices are those we have seen and explored
1910+    BLACK = 2
1911+    color        = [WHITE for i in xrange(len(graph))]
1912+    predecessor  = [None for i in xrange(len(graph))]
1913+    distance     = [-1 for i in xrange(len(graph))]
1914+    queue = [s] # vertices that we haven't explored yet.
1915+    color[s] = GRAY
1916+    distance[s] = 0
1917+    while queue:
1918+        n = queue.pop(0)
1919+        for v in graph[n]:
1920+            if color[v] == WHITE:
1921+                color[v] = GRAY
1922+                distance[v] = distance[n] + 1
1923+                predecessor[v] = n
1924+                queue.append(v)
1925+        color[n] = BLACK
1926+    return predecessor
1927}
1928
1929Context:
1930
1931[Dependency on Windmill test framework is not needed yet.
1932david-sarah@jacaranda.org**20100504161043
1933 Ignore-this: be088712bec650d4ef24766c0026ebc8
1934]
1935[tests: pass z to tar so that BSD tar will know to ungzip
1936zooko@zooko.com**20100504090628
1937 Ignore-this: 1339e493f255e8fc0b01b70478f23a09
1938]
1939[setup: update comments and URLs in setup.cfg
1940zooko@zooko.com**20100504061653
1941 Ignore-this: f97692807c74bcab56d33100c899f829
1942]
1943[setup: reorder and extend the show-tool-versions script, the better to glean information about our new buildslaves
1944zooko@zooko.com**20100504045643
1945 Ignore-this: 836084b56b8d4ee8f1de1f4efb706d36
1946]
1947[CLI: Support for https url in option --node-url
1948Francois Deppierraz <francois@ctrlaltdel.ch>**20100430185609
1949 Ignore-this: 1717176b4d27c877e6bc67a944d9bf34
1950 
1951 This patch modifies the regular expression used for verifying of '--node-url'
1952 parameter.  Support for accessing a Tahoe gateway over HTTPS was already
1953 present, thanks to Python's urllib.
1954 
1955]
1956[backupdb.did_create_directory: use REPLACE INTO, not INSERT INTO + ignore error
1957Brian Warner <warner@lothar.com>**20100428050803
1958 Ignore-this: 1fca7b8f364a21ae413be8767161e32f
1959 
1960 This handles the case where we upload a new tahoe directory for a
1961 previously-processed local directory, possibly creating a new dircap (if the
1962 metadata had changed). Now we replace the old dirhash->dircap record. The
1963 previous behavior left the old record in place (with the old dircap and
1964 timestamps), so we'd never stop creating new directories and never converge
1965 on a null backup.
1966]
1967["tahoe webopen": add --info flag, to get ?t=info
1968Brian Warner <warner@lothar.com>**20100424233003
1969 Ignore-this: 126b0bb6db340fabacb623d295eb45fa
1970 
1971 Also fix some trailing whitespace.
1972]
1973[docs: install.html http-equiv refresh to quickstart.html
1974zooko@zooko.com**20100421165708
1975 Ignore-this: 52b4b619f9dde5886ae2cd7f1f3b734b
1976]
1977[docs: install.html -> quickstart.html
1978zooko@zooko.com**20100421155757
1979 Ignore-this: 6084e203909306bed93efb09d0e6181d
1980 It is not called "installing" because that implies that it is going to change the configuration of your operating system. It is not called "building" because that implies that you need developer tools like a compiler. Also I added a stern warning against looking at the "InstallDetails" wiki page, which I have renamed to "AdvancedInstall".
1981]
1982[Fix another typo in tahoe_storagespace munin plugin
1983david-sarah@jacaranda.org**20100416220935
1984 Ignore-this: ad1f7aa66b554174f91dfb2b7a3ea5f3
1985]
1986[Add dependency on windmill >= 1.3
1987david-sarah@jacaranda.org**20100416190404
1988 Ignore-this: 4437a7a464e92d6c9012926b18676211
1989]
1990[licensing: phrase the OpenSSL-exemption in the vocabulary of copyright instead of computer technology, and replicate the exemption from the GPL to the TGPPL
1991zooko@zooko.com**20100414232521
1992 Ignore-this: a5494b2f582a295544c6cad3f245e91
1993]
1994[munin-tahoe_storagespace
1995freestorm77@gmail.com**20100221203626
1996 Ignore-this: 14d6d6a587afe1f8883152bf2e46b4aa
1997 
1998 Plugin configuration rename
1999 
2000]
2001[setup: add licensing declaration for setuptools (noticed by the FSF compliance folks)
2002zooko@zooko.com**20100309184415
2003 Ignore-this: 2dfa7d812d65fec7c72ddbf0de609ccb
2004]
2005[setup: fix error in licensing declaration from Shawn Willden, as noted by the FSF compliance division
2006zooko@zooko.com**20100309163736
2007 Ignore-this: c0623d27e469799d86cabf67921a13f8
2008]
2009[CREDITS to Jacob Appelbaum
2010zooko@zooko.com**20100304015616
2011 Ignore-this: 70db493abbc23968fcc8db93f386ea54
2012]
2013[desert-island-build-with-proper-versions
2014jacob@appelbaum.net**20100304013858]
2015[docs: a few small edits to try to guide newcomers through the docs
2016zooko@zooko.com**20100303231902
2017 Ignore-this: a6aab44f5bf5ad97ea73e6976bc4042d
2018 These edits were suggested by my watching over Jake Appelbaum's shoulder as he completely ignored/skipped/missed install.html and also as he decided that debian.txt wouldn't help him with basic installation. Then I threw in a few docs edits that have been sitting around in my sandbox asking to be committed for months.
2019]
2020[TAG allmydata-tahoe-1.6.1
2021david-sarah@jacaranda.org**20100228062314
2022 Ignore-this: eb5f03ada8ea953ee7780e7fe068539
2023]
2024Patch bundle hash:
20251e0a4c5dcff946a4d6a35bdb914c7c357a3e6308