source: trunk/src/allmydata/test/test_crawler.py

Last change on this file was 53084f7, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-27T23:49:07Z

remove more Python2 compatibility

  • Property mode set to 100644
File size: 16.7 KB
Line 
1"""
2Tests for allmydata.storage.crawler.
3
4Ported to Python 3.
5"""
6
7
8import time
9import os.path
10from twisted.trial import unittest
11from twisted.application import service
12from twisted.internet import defer
13from foolscap.api import eventually, fireEventually
14
15from allmydata.util import fileutil, hashutil, pollmixin
16from allmydata.storage.server import StorageServer, si_b2a
17from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded
18
19from allmydata.test.common_util import StallMixin
20
21class BucketEnumeratingCrawler(ShareCrawler):
22    cpu_slice = 500 # make sure it can complete in a single slice
23    slow_start = 0
24    def __init__(self, *args, **kwargs):
25        ShareCrawler.__init__(self, *args, **kwargs)
26        self.all_buckets = []
27        self.finished_d = defer.Deferred()
28    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
29        # Bucket _inputs_ are bytes, and that's what we will compare this
30        # to:
31        storage_index_b32 = storage_index_b32.encode("ascii")
32        self.all_buckets.append(storage_index_b32)
33    def finished_cycle(self, cycle):
34        eventually(self.finished_d.callback, None)
35
36class PacedCrawler(ShareCrawler):
37    cpu_slice = 500 # make sure it can complete in a single slice
38    slow_start = 0
39    def __init__(self, *args, **kwargs):
40        ShareCrawler.__init__(self, *args, **kwargs)
41        self.countdown = 6
42        self.all_buckets = []
43        self.finished_d = defer.Deferred()
44        self.yield_cb = None
45    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
46        # Bucket _inputs_ are bytes, and that's what we will compare this
47        # to:
48        storage_index_b32 = storage_index_b32.encode("ascii")
49        self.all_buckets.append(storage_index_b32)
50        self.countdown -= 1
51        if self.countdown == 0:
52            # force a timeout. We restore it in yielding()
53            self.cpu_slice = -1.0
54    def yielding(self, sleep_time):
55        self.cpu_slice = 500
56        if self.yield_cb:
57            self.yield_cb()
58    def finished_cycle(self, cycle):
59        eventually(self.finished_d.callback, None)
60
61class ConsumingCrawler(ShareCrawler):
62    cpu_slice = 0.5
63    allowed_cpu_percentage = 0.5
64    minimum_cycle_time = 0
65    slow_start = 0
66
67    def __init__(self, *args, **kwargs):
68        ShareCrawler.__init__(self, *args, **kwargs)
69        self.accumulated = 0.0
70        self.cycles = 0
71        self.last_yield = 0.0
72    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
73        start = time.time()
74        time.sleep(0.05)
75        elapsed = time.time() - start
76        self.accumulated += elapsed
77        self.last_yield += elapsed
78    def finished_cycle(self, cycle):
79        self.cycles += 1
80    def yielding(self, sleep_time):
81        self.last_yield = 0.0
82
83class OneShotCrawler(ShareCrawler):
84    cpu_slice = 500 # make sure it can complete in a single slice
85    slow_start = 0
86    def __init__(self, *args, **kwargs):
87        ShareCrawler.__init__(self, *args, **kwargs)
88        self.counter = 0
89        self.finished_d = defer.Deferred()
90    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
91        self.counter += 1
92    def finished_cycle(self, cycle):
93        self.finished_d.callback(None)
94        self.disownServiceParent()
95
96class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
97    def setUp(self):
98        self.s = service.MultiService()
99        self.s.startService()
100
101    def tearDown(self):
102        return self.s.stopService()
103
104    def si(self, i):
105        return hashutil.storage_index_hash(b"%d" % (i,))
106    def rs(self, i, serverid):
107        return hashutil.bucket_renewal_secret_hash(b"%d" % (i,), serverid)
108    def cs(self, i, serverid):
109        return hashutil.bucket_cancel_secret_hash(b"%d" % (i,), serverid)
110
111    def write(self, i, ss, serverid, tail=0):
112        si = self.si(i)
113        si = si[:-1] + bytes(bytearray((tail,)))
114        had,made = ss.allocate_buckets(si,
115                                       self.rs(i, serverid),
116                                       self.cs(i, serverid),
117                                       set([0]), 99)
118        made[0].write(0, b"data")
119        made[0].close()
120        return si_b2a(si)
121
122    def test_immediate(self):
123        self.basedir = "crawler/Basic/immediate"
124        fileutil.make_dirs(self.basedir)
125        serverid = b"\x00" * 20
126        ss = StorageServer(self.basedir, serverid)
127        ss.setServiceParent(self.s)
128
129        sis = [self.write(i, ss, serverid) for i in range(10)]
130        statefile = os.path.join(self.basedir, "statefile")
131
132        c = BucketEnumeratingCrawler(ss, statefile, allowed_cpu_percentage=.1)
133        c.load_state()
134
135        c.start_current_prefix(time.time())
136        self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
137
138        # make sure the statefile has been returned to the starting point
139        c.finished_d = defer.Deferred()
140        c.all_buckets = []
141        c.start_current_prefix(time.time())
142        self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
143
144        # check that a new crawler picks up on the state file properly
145        c2 = BucketEnumeratingCrawler(ss, statefile)
146        c2.load_state()
147
148        c2.start_current_prefix(time.time())
149        self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
150
151    def test_service(self):
152        self.basedir = "crawler/Basic/service"
153        fileutil.make_dirs(self.basedir)
154        serverid = b"\x00" * 20
155        ss = StorageServer(self.basedir, serverid)
156        ss.setServiceParent(self.s)
157
158        sis = [self.write(i, ss, serverid) for i in range(10)]
159
160        statefile = os.path.join(self.basedir, "statefile")
161        c = BucketEnumeratingCrawler(ss, statefile)
162        c.setServiceParent(self.s)
163
164        # it should be legal to call get_state() and get_progress() right
165        # away, even before the first tick is performed. No work should have
166        # been done yet.
167        s = c.get_state()
168        p = c.get_progress()
169        self.failUnlessEqual(s["last-complete-prefix"], None)
170        self.failUnlessEqual(s["current-cycle"], None)
171        self.failUnlessEqual(p["cycle-in-progress"], False)
172
173        d = c.finished_d
174        def _check(ignored):
175            self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
176        d.addCallback(_check)
177        return d
178
179    def test_paced(self):
180        self.basedir = "crawler/Basic/paced"
181        fileutil.make_dirs(self.basedir)
182        serverid = b"\x00" * 20
183        ss = StorageServer(self.basedir, serverid)
184        ss.setServiceParent(self.s)
185
186        # put four buckets in each prefixdir
187        sis = []
188        for i in range(10):
189            for tail in range(4):
190                sis.append(self.write(i, ss, serverid, tail))
191
192        statefile = os.path.join(self.basedir, "statefile")
193
194        c = PacedCrawler(ss, statefile)
195        c.load_state()
196        try:
197            c.start_current_prefix(time.time())
198        except TimeSliceExceeded:
199            pass
200        # that should stop in the middle of one of the buckets. Since we
201        # aren't using its normal scheduler, we have to save its state
202        # manually.
203        c.save_state()
204        c.cpu_slice = PacedCrawler.cpu_slice
205        self.failUnlessEqual(len(c.all_buckets), 6)
206
207        c.start_current_prefix(time.time()) # finish it
208        self.failUnlessEqual(len(sis), len(c.all_buckets))
209        self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
210
211        # make sure the statefile has been returned to the starting point
212        c.finished_d = defer.Deferred()
213        c.all_buckets = []
214        c.start_current_prefix(time.time())
215        self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
216        del c
217
218        # start a new crawler, it should start from the beginning
219        c = PacedCrawler(ss, statefile)
220        c.load_state()
221        try:
222            c.start_current_prefix(time.time())
223        except TimeSliceExceeded:
224            pass
225        # that should stop in the middle of one of the buckets. Since we
226        # aren't using its normal scheduler, we have to save its state
227        # manually.
228        c.save_state()
229        c.cpu_slice = PacedCrawler.cpu_slice
230
231        # a third crawler should pick up from where it left off
232        c2 = PacedCrawler(ss, statefile)
233        c2.all_buckets = c.all_buckets[:]
234        c2.load_state()
235        c2.countdown = -1
236        c2.start_current_prefix(time.time())
237        self.failUnlessEqual(len(sis), len(c2.all_buckets))
238        self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
239        del c, c2
240
241        # now stop it at the end of a bucket (countdown=4), to exercise a
242        # different place that checks the time
243        c = PacedCrawler(ss, statefile)
244        c.load_state()
245        c.countdown = 4
246        try:
247            c.start_current_prefix(time.time())
248        except TimeSliceExceeded:
249            pass
250        # that should stop at the end of one of the buckets. Again we must
251        # save state manually.
252        c.save_state()
253        c.cpu_slice = PacedCrawler.cpu_slice
254        self.failUnlessEqual(len(c.all_buckets), 4)
255        c.start_current_prefix(time.time()) # finish it
256        self.failUnlessEqual(len(sis), len(c.all_buckets))
257        self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
258        del c
259
260        # stop it again at the end of the bucket, check that a new checker
261        # picks up correctly
262        c = PacedCrawler(ss, statefile)
263        c.load_state()
264        c.countdown = 4
265        try:
266            c.start_current_prefix(time.time())
267        except TimeSliceExceeded:
268            pass
269        # that should stop at the end of one of the buckets.
270        c.save_state()
271
272        c2 = PacedCrawler(ss, statefile)
273        c2.all_buckets = c.all_buckets[:]
274        c2.load_state()
275        c2.countdown = -1
276        c2.start_current_prefix(time.time())
277        self.failUnlessEqual(len(sis), len(c2.all_buckets))
278        self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
279        del c, c2
280
281    def test_paced_service(self):
282        self.basedir = "crawler/Basic/paced_service"
283        fileutil.make_dirs(self.basedir)
284        serverid = b"\x00" * 20
285        ss = StorageServer(self.basedir, serverid)
286        ss.setServiceParent(self.s)
287
288        sis = [self.write(i, ss, serverid) for i in range(10)]
289
290        statefile = os.path.join(self.basedir, "statefile")
291        c = PacedCrawler(ss, statefile)
292
293        did_check_progress = [False]
294        def check_progress():
295            c.yield_cb = None
296            try:
297                p = c.get_progress()
298                self.failUnlessEqual(p["cycle-in-progress"], True)
299                pct = p["cycle-complete-percentage"]
300                # after 6 buckets, we happen to be at 76.17% complete. As
301                # long as we create shares in deterministic order, this will
302                # continue to be true.
303                self.failUnlessEqual(int(pct), 76)
304                left = p["remaining-sleep-time"]
305                self.failUnless(isinstance(left, float), left)
306                self.failUnless(left > 0.0, left)
307            except Exception as e:
308                did_check_progress[0] = e
309            else:
310                did_check_progress[0] = True
311        c.yield_cb = check_progress
312
313        c.setServiceParent(self.s)
314        # that should get through 6 buckets, pause for a little while (and
315        # run check_progress()), then resume
316
317        d = c.finished_d
318        def _check(ignored):
319            if did_check_progress[0] is not True:
320                raise did_check_progress[0]
321            self.failUnless(did_check_progress[0])
322            self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
323            # at this point, the crawler should be sitting in the inter-cycle
324            # timer, which should be pegged at the minumum cycle time
325            self.failUnless(c.timer)
326            self.failUnless(c.sleeping_between_cycles)
327            self.failUnlessEqual(c.current_sleep_time, c.minimum_cycle_time)
328
329            p = c.get_progress()
330            self.failUnlessEqual(p["cycle-in-progress"], False)
331            naptime = p["remaining-wait-time"]
332            self.failUnless(isinstance(naptime, float), naptime)
333            # min-cycle-time is 300, so this is basically testing that it took
334            # less than 290s to crawl
335            self.failUnless(naptime > 10.0, naptime)
336            soon = p["next-crawl-time"] - time.time()
337            self.failUnless(soon > 10.0, soon)
338
339        d.addCallback(_check)
340        return d
341
342    def OFF_test_cpu_usage(self):
343        # this test can't actually assert anything, because too many
344        # buildslave machines are slow. But on a fast developer machine, it
345        # can produce interesting results. So if you care about how well the
346        # Crawler is accomplishing it's run-slowly goals, re-enable this test
347        # and read the stdout when it runs.
348
349        self.basedir = "crawler/Basic/cpu_usage"
350        fileutil.make_dirs(self.basedir)
351        serverid = b"\x00" * 20
352        ss = StorageServer(self.basedir, serverid)
353        ss.setServiceParent(self.s)
354
355        for i in range(10):
356            self.write(i, ss, serverid)
357
358        statefile = os.path.join(self.basedir, "statefile")
359        c = ConsumingCrawler(ss, statefile)
360        c.setServiceParent(self.s)
361
362        # this will run as fast as it can, consuming about 50ms per call to
363        # process_bucket(), limited by the Crawler to about 50% cpu. We let
364        # it run for a few seconds, then compare how much time
365        # process_bucket() got vs wallclock time. It should get between 10%
366        # and 70% CPU. This is dicey, there's about 100ms of overhead per
367        # 300ms slice (saving the state file takes about 150-200us, but we do
368        # it 1024 times per cycle, one for each [empty] prefixdir), leaving
369        # 200ms for actual processing, which is enough to get through 4
370        # buckets each slice, then the crawler sleeps for 300ms/0.5 = 600ms,
371        # giving us 900ms wallclock per slice. In 4.0 seconds we can do 4.4
372        # slices, giving us about 17 shares, so we merely assert that we've
373        # finished at least one cycle in that time.
374
375        # with a short cpu_slice (so we can keep this test down to 4
376        # seconds), the overhead is enough to make a nominal 50% usage more
377        # like 30%. Forcing sleep_time to 0 only gets us 67% usage.
378
379        start = time.time()
380        d = self.stall(delay=4.0)
381        def _done(res):
382            elapsed = time.time() - start
383            percent = 100.0 * c.accumulated / elapsed
384            # our buildslaves vary too much in their speeds and load levels,
385            # and many of them only manage to hit 7% usage when our target is
386            # 50%. So don't assert anything about the results, just log them.
387            print()
388            print("crawler: got %d%% percent when trying for 50%%" % percent)
389            print("crawler: got %d full cycles" % c.cycles)
390        d.addCallback(_done)
391        return d
392
393    def test_empty_subclass(self):
394        self.basedir = "crawler/Basic/empty_subclass"
395        fileutil.make_dirs(self.basedir)
396        serverid = b"\x00" * 20
397        ss = StorageServer(self.basedir, serverid)
398        ss.setServiceParent(self.s)
399
400        for i in range(10):
401            self.write(i, ss, serverid)
402
403        statefile = os.path.join(self.basedir, "statefile")
404        c = ShareCrawler(ss, statefile)
405        c.slow_start = 0
406        c.setServiceParent(self.s)
407
408        # we just let it run for a while, to get figleaf coverage of the
409        # empty methods in the base class
410
411        def _check():
412            return bool(c.state["last-cycle-finished"] is not None)
413        d = self.poll(_check)
414        def _done(ignored):
415            state = c.get_state()
416            self.failUnless(state["last-cycle-finished"] is not None)
417        d.addCallback(_done)
418        return d
419
420
421    def test_oneshot(self):
422        self.basedir = "crawler/Basic/oneshot"
423        fileutil.make_dirs(self.basedir)
424        serverid = b"\x00" * 20
425        ss = StorageServer(self.basedir, serverid)
426        ss.setServiceParent(self.s)
427
428        for i in range(30):
429            self.write(i, ss, serverid)
430
431        statefile = os.path.join(self.basedir, "statefile")
432        c = OneShotCrawler(ss, statefile)
433        c.setServiceParent(self.s)
434
435        d = c.finished_d
436        def _finished_first_cycle(ignored):
437            return fireEventually(c.counter)
438        d.addCallback(_finished_first_cycle)
439        def _check(old_counter):
440            # the crawler should do any work after it's been stopped
441            self.failUnlessEqual(old_counter, c.counter)
442            self.failIf(c.running)
443            self.failIf(c.timer)
444            self.failIf(c.current_sleep_time)
445            s = c.get_state()
446            self.failUnlessEqual(s["last-cycle-finished"], 0)
447            self.failUnlessEqual(s["current-cycle"], None)
448        d.addCallback(_check)
449        return d
450
Note: See TracBrowser for help on using the repository browser.