source: trunk/src/allmydata/test/test_consumer.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 2.4 KB
Line 
1"""
2Tests for allmydata.util.consumer.
3
4Ported to Python 3.
5"""
6
7from zope.interface import implementer
8from twisted.internet.interfaces import IPushProducer, IPullProducer
9
10from allmydata.util.consumer import MemoryConsumer
11
12from .common import (
13    SyncTestCase,
14)
15from testtools.matchers import (
16    Equals,
17)
18
19
20@implementer(IPushProducer)
21@implementer(IPullProducer)
22class Producer(object):
23    """Can be used as either streaming or non-streaming producer.
24
25    If used as streaming, the test should call iterate() manually.
26    """
27
28    def __init__(self, consumer, data):
29        self.data = data
30        self.consumer = consumer
31        self.done = False
32
33    def stopProducing(self):
34        pass
35
36    def pauseProducing(self):
37        pass
38
39    def resumeProducing(self):
40        """Kick off streaming."""
41        self.iterate()
42
43    def iterate(self):
44        """Do another iteration of writing."""
45        if self.done:
46            raise RuntimeError(
47                "There's a bug somewhere, shouldn't iterate after being done"
48            )
49        if self.data:
50            self.consumer.write(self.data.pop(0))
51        else:
52            self.done = True
53            self.consumer.unregisterProducer()
54
55
56class MemoryConsumerTests(SyncTestCase):
57    """Tests for MemoryConsumer."""
58
59    def test_push_producer(self):
60        """
61        A MemoryConsumer accumulates all data sent by a streaming producer.
62        """
63        consumer = MemoryConsumer()
64        producer = Producer(consumer, [b"abc", b"def", b"ghi"])
65        consumer.registerProducer(producer, True)
66        self.assertThat(consumer.chunks, Equals([b"abc"]))
67        producer.iterate()
68        producer.iterate()
69        self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"]))
70        self.assertFalse(consumer.done)
71        producer.iterate()
72        self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"]))
73        self.assertTrue(consumer.done)
74
75    def test_pull_producer(self):
76        """
77        A MemoryConsumer accumulates all data sent by a non-streaming producer.
78        """
79        consumer = MemoryConsumer()
80        producer = Producer(consumer, [b"abc", b"def", b"ghi"])
81        consumer.registerProducer(producer, False)
82        self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"]))
83        self.assertTrue(consumer.done)
84
85
86# download_to_data() is effectively tested by some of the filenode tests, e.g.
87# test_immutable.py.
Note: See TracBrowser for help on using the repository browser.