source: trunk/src/allmydata/test/test_no_network.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 1.6 KB
Line 
1"""
2Test the NoNetworkGrid test harness.
3
4Ported to Python 3.
5"""
6
7from twisted.trial import unittest
8from twisted.application import service
9from allmydata.test.no_network import NoNetworkGrid
10from allmydata.immutable.upload import Data
11from allmydata.util.consumer import download_to_data
12
13from .common import (
14    SameProcessStreamEndpointAssigner,
15)
16
17class Harness(unittest.TestCase):
18    def setUp(self):
19        self.s = service.MultiService()
20        self.s.startService()
21        self.addCleanup(self.s.stopService)
22        self.port_assigner = SameProcessStreamEndpointAssigner()
23        self.port_assigner.setUp()
24        self.addCleanup(self.port_assigner.tearDown)
25
26    def grid(self, basedir):
27        return NoNetworkGrid(
28            basedir,
29            num_clients=1,
30            num_servers=10,
31            client_config_hooks={},
32            port_assigner=self.port_assigner,
33        )
34
35    def test_create(self):
36        basedir = "no_network/Harness/create"
37        g = self.grid(basedir)
38        g.startService()
39        return g.stopService()
40
41    def test_upload(self):
42        basedir = "no_network/Harness/upload"
43        g = self.grid(basedir)
44        g.setServiceParent(self.s)
45
46        c0 = g.clients[0]
47        DATA = b"Data to upload" * 100
48        data = Data(DATA, b"")
49        d = c0.upload(data)
50        def _uploaded(res):
51            n = c0.create_node_from_uri(res.get_uri())
52            return download_to_data(n)
53        d.addCallback(_uploaded)
54        def _check(res):
55            self.failUnlessEqual(res, DATA)
56        d.addCallback(_check)
57
58        return d
Note: See TracBrowser for help on using the repository browser.