source: trunk/src/allmydata/test/test_storage_https.py

Last change on this file was df491de4, checked in by Jean-Paul Calderone <exarkun@…>, at 2023-08-22T13:01:42Z

Add two more cases to the test vector

  • Property mode set to 100644
File size: 8.5 KB
Line 
1"""
2Tests for the TLS part of the HTTP Storage Protocol.
3
4More broadly, these are tests for HTTPS usage as replacement for Foolscap's
5server authentication logic, which may one day apply outside of HTTP Storage
6Protocol.
7"""
8
9from contextlib import asynccontextmanager
10from base64 import b64decode
11
12from yaml import safe_load
13from cryptography import x509
14
15from twisted.internet.endpoints import serverFromString
16from twisted.internet import reactor
17from twisted.internet.defer import maybeDeferred
18from twisted.web.server import Site
19from twisted.web.static import Data
20from twisted.web.client import Agent, HTTPConnectionPool, ResponseNeverReceived
21from twisted.python.filepath import FilePath
22from treq.client import HTTPClient
23
24from .common import SyncTestCase, AsyncTestCase, SameProcessStreamEndpointAssigner
25from .certs import (
26    generate_certificate,
27    generate_private_key,
28    private_key_to_file,
29    cert_to_file,
30)
31from ..storage.http_common import get_spki, get_spki_hash
32from ..storage.http_client import _StorageClientHTTPSPolicy
33from ..storage.http_server import _TLSEndpointWrapper
34from ..util.deferredutil import async_to_deferred
35from .common_system import spin_until_cleanup_done
36
37spki_test_vectors_path = FilePath(__file__).sibling("data").child("spki-hash-test-vectors.yaml")
38
39
40class HTTPSNurlTests(SyncTestCase):
41    """Tests for HTTPS NURLs."""
42
43    def test_spki_hash(self):
44        """
45        The output of ``get_spki_hash()`` matches the semantics of RFC
46        7469.
47
48        The test vector certificates were generated using the openssl command
49        line tool::
50
51            openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365
52
53        The expected hash was generated using Appendix A instructions in the
54        RFC::
55
56            openssl x509 -noout -in certificate.pem -pubkey | \
57                openssl asn1parse -noout -inform pem -out public.key
58            openssl dgst -sha256 -binary public.key | openssl enc -base64
59
60        The OpenSSL base64-encoded output was then adjusted into the URL-safe
61        base64 variation: `+` and `/` were replaced with `-` and `_` and the
62        trailing `=` padding was removed.
63
64        The expected SubjectPublicKeyInfo bytes were extracted from the
65        implementation of `get_spki_hash` after its result matched the
66        expected value generated by the command above.
67        """
68        spki_cases = safe_load(spki_test_vectors_path.getContent())["vector"]
69        for n, case in enumerate(spki_cases):
70            certificate_text = case["certificate"].encode("ascii")
71            expected_spki = b64decode(case["expected-spki"])
72            expected_hash = case["expected-hash"].encode("ascii")
73
74            try:
75                certificate = x509.load_pem_x509_certificate(certificate_text)
76            except Exception as e:
77                self.fail(f"Loading case {n} certificate failed: {e}")
78
79            self.assertEqual(
80                expected_spki,
81                get_spki(certificate),
82                f"case {n} spki data mismatch",
83            )
84            self.assertEqual(
85                expected_hash,
86                get_spki_hash(certificate),
87                f"case {n} spki hash mismatch",
88            )
89
90
91class PinningHTTPSValidation(AsyncTestCase):
92    """
93    Test client-side validation logic of HTTPS certificates that uses
94    Tahoe-LAFS's pinning-based scheme instead of the traditional certificate
95    authority scheme.
96
97    https://cryptography.io/en/latest/x509/tutorial/#creating-a-self-signed-certificate
98    """
99
100    def setUp(self):
101        self._port_assigner = SameProcessStreamEndpointAssigner()
102        self._port_assigner.setUp()
103        self.addCleanup(self._port_assigner.tearDown)
104        return AsyncTestCase.setUp(self)
105
106    def tearDown(self):
107        d = maybeDeferred(AsyncTestCase.tearDown, self)
108        return d.addCallback(lambda _: spin_until_cleanup_done())
109
110    @asynccontextmanager
111    async def listen(self, private_key_path: FilePath, cert_path: FilePath):
112        """
113        Context manager that runs a HTTPS server with the given private key
114        and certificate.
115
116        Returns a URL that will connect to the server.
117        """
118        location_hint, endpoint_string = self._port_assigner.assign(reactor)
119        underlying_endpoint = serverFromString(reactor, endpoint_string)
120        endpoint = _TLSEndpointWrapper.from_paths(
121            underlying_endpoint, private_key_path, cert_path
122        )
123        root = Data(b"YOYODYNE", "text/plain")
124        root.isLeaf = True
125        listening_port = await endpoint.listen(Site(root))
126        try:
127            yield f"https://127.0.0.1:{listening_port.getHost().port}/"  # type: ignore[attr-defined]
128        finally:
129            result = listening_port.stopListening()
130            if result is not None:
131                await result
132
133    def request(self, url: str, expected_certificate: x509.Certificate):
134        """
135        Send a HTTPS request to the given URL, ensuring that the given
136        certificate is the one used via SPKI-hash-based pinning comparison.
137        """
138        # No persistent connections, so we don't have dirty reactor at the end
139        # of the test.
140        treq_client = HTTPClient(
141            Agent(
142                reactor,
143                _StorageClientHTTPSPolicy(
144                    expected_spki_hash=get_spki_hash(expected_certificate)
145                ),
146                pool=HTTPConnectionPool(reactor, persistent=False),
147            )
148        )
149        return treq_client.get(url)
150
151    @async_to_deferred
152    async def test_success(self):
153        """
154        If all conditions are met, a TLS client using the Tahoe-LAFS policy can
155        connect to the server.
156        """
157        private_key = generate_private_key()
158        certificate = generate_certificate(private_key)
159        async with self.listen(
160            private_key_to_file(FilePath(self.mktemp()), private_key),
161            cert_to_file(FilePath(self.mktemp()), certificate),
162        ) as url:
163            response = await self.request(url, certificate)
164            self.assertEqual(await response.content(), b"YOYODYNE")
165
166    @async_to_deferred
167    async def test_server_certificate_has_wrong_hash(self):
168        """
169        If the server's certificate hash doesn't match the hash the client
170        expects, the request to the server fails.
171        """
172        private_key1 = generate_private_key()
173        certificate1 = generate_certificate(private_key1)
174        private_key2 = generate_private_key()
175        certificate2 = generate_certificate(private_key2)
176
177        async with self.listen(
178            private_key_to_file(FilePath(self.mktemp()), private_key1),
179            cert_to_file(FilePath(self.mktemp()), certificate1),
180        ) as url:
181            with self.assertRaises(ResponseNeverReceived):
182                await self.request(url, certificate2)
183
184    @async_to_deferred
185    async def test_server_certificate_expired(self):
186        """
187        If the server's certificate has expired, the request to the server
188        succeeds if the hash matches the one the client expects; expiration has
189        no effect.
190        """
191        private_key = generate_private_key()
192        certificate = generate_certificate(private_key, expires_days=-10)
193
194        async with self.listen(
195            private_key_to_file(FilePath(self.mktemp()), private_key),
196            cert_to_file(FilePath(self.mktemp()), certificate),
197        ) as url:
198            response = await self.request(url, certificate)
199            self.assertEqual(await response.content(), b"YOYODYNE")
200
201    @async_to_deferred
202    async def test_server_certificate_not_valid_yet(self):
203        """
204        If the server's certificate is only valid starting in The Future, the
205        request to the server succeeds if the hash matches the one the client
206        expects; start time has no effect.
207        """
208        private_key = generate_private_key()
209        certificate = generate_certificate(
210            private_key, expires_days=10, valid_in_days=5
211        )
212
213        async with self.listen(
214            private_key_to_file(FilePath(self.mktemp()), private_key),
215            cert_to_file(FilePath(self.mktemp()), certificate),
216        ) as url:
217            response = await self.request(url, certificate)
218            self.assertEqual(await response.content(), b"YOYODYNE")
219
220    # A potential attack to test is a private key that doesn't match the
221    # certificate... but OpenSSL (quite rightly) won't let you listen with that
222    # so I don't know how to test that! See
223    # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3884
Note: See TracBrowser for help on using the repository browser.