source: trunk/src/allmydata/test/test_auth.py

Last change on this file was 949b9044, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-10-16T16:45:17Z

Pacify mypy

  • Property mode set to 100644
File size: 11.8 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from typing import Literal
6
7from hypothesis import (
8    given,
9)
10from hypothesis.strategies import (
11    text,
12    characters,
13    lists,
14)
15
16from twisted.trial import unittest
17from twisted.python import filepath
18from twisted.cred import error, credentials
19from twisted.conch import error as conch_error
20from twisted.conch.ssh import keys
21
22from allmydata.frontends import auth
23from allmydata.util.fileutil import abspath_expanduser_unicode
24
25
26DUMMY_KEY = keys.Key.fromString("""\
27-----BEGIN RSA PRIVATE KEY-----
28MIICXQIBAAKBgQDEP3DYiukOu+NrUlBZeLL9JoHkK5nSvINYfeOQWYVW9J5NG485
29pZFVUQKzvvht34Ihj4ucrrvj7vOp+FFvzxI+zHKBpDxyJwV96dvWDAZMjxTxL7iV
308HcO7hqgtQ/Xk1Kjde5lH3EOEDs3IhFHA+sox9y6i4A5NUr2AJZSHiOEVwIDAQAB
31AoGASrrNwefDr7SkeS2zIx7vKa8ML1LbFIBsk7n8ee9c8yvbTAl+lLkTiqV6ne/O
32sig2aYk75MI1Eirf5o2ElUsI6u36i6AeKL2u/W7tLBVijmBB8dTiWZ5gMOARWt8w
33daF2An2826YdcU+iNZ7Yi0q4xtlxHQn3JcNNWxicphLvt0ECQQDtajJ/bK+Nqd9j
34/WGvqYcMzkkorQq/0+MQYhcIwDlpf2Xoi45tP4HeoBubeJmU5+jXpXmdP5epWpBv
35k3ZCwV7pAkEA05xBP2HTdwRFTJov5I/w7uKOrn7mj7DCvSjQFCufyPOoCJJMeBSq
36tfCQlHFtwlkyNfiSbhtgZ0Pp6ovL+1RBPwJBAOlFRBKxrpgpxcXQK5BWqMwrT/S4
37eWxb+6mYR3ugq4h91Zq0rJ+pG6irdhS/XV/SsZRZEXIxDoom4u3OXQ9gQikCQErM
38ywuaiuNhMRXY0uEaOHJYx1LLLLjSJKQ0zwiyOvMPnfAZtsojlAxoEtNGHSQ731HQ
39ogIlzzfxe7ga3mni6IUCQQCwNK9zwARovcQ8nByqotGQzohpl+1b568+iw8GXP2u
40dBSD8940XU3YW+oeq8e+p3yQ2GinHfeJ3BYQyNQLuMAJ
41-----END RSA PRIVATE KEY-----
42""")
43
44DUMMY_KEY_DSA = keys.Key.fromString("""\
45-----BEGIN OPENSSH PRIVATE KEY-----
46b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABsQAAAAdzc2gtZH
47NzAAAAgQDKMh/ELaiP21LYRBuPbUy7dUhv/XZwV7aS1LzxSP+KaJvtDOei8X76XEAfkqX+
48aGh9eup+BLkezrV6LlpO9uPzhY8ChlKpkvw5PZKv/2agSrVxZyG7yEzHNtSBQXE6qNMwIk
49N/ycXLGCqyAhQSzRhLz9ETNaslRDLo7YyVWkiuAQAAABUA5nTatFKux5EqZS4EarMWFRBU
50i1UAAACAFpkkK+JsPixSTPyn0DNMoGKA0Klqy8h61Ds6pws+4+aJQptUBshpwNw1ypo7MO
51+goDZy3wwdWtURTPGMgesNdEfxp8L2/kqE4vpMK0myoczCqOiWMeNB/x1AStbSkBI8WmHW
522htgsC01xbaix/FrA3edK8WEyv+oIxlbV1FkrPkAAACANb0EpCc8uoR4/32rO2JLsbcLBw
53H5wc2khe7AKkIa9kUknRIRvoCZUtXF5XuXXdRmnpVEm2KcsLdtZjip43asQcqgt0Kz3nuF
54kAf7bI98G1waFUimcCSPsal4kCmW2HC11sg/BWOt5qczX/0/3xVxpo6juUeBq9ncnFTvPX
555fOlEAAAHoJkFqHiZBah4AAAAHc3NoLWRzcwAAAIEAyjIfxC2oj9tS2EQbj21Mu3VIb/12
56cFe2ktS88Uj/imib7QznovF++lxAH5Kl/mhofXrqfgS5Hs61ei5aTvbj84WPAoZSqZL8OT
572Sr/9moEq1cWchu8hMxzbUgUFxOqjTMCJDf8nFyxgqsgIUEs0YS8/REzWrJUQy6O2MlVpI
58rgEAAAAVAOZ02rRSrseRKmUuBGqzFhUQVItVAAAAgBaZJCvibD4sUkz8p9AzTKBigNCpas
59vIetQ7OqcLPuPmiUKbVAbIacDcNcqaOzDvoKA2ct8MHVrVEUzxjIHrDXRH8afC9v5KhOL6
60TCtJsqHMwqjoljHjQf8dQErW0pASPFph1tobYLAtNcW2osfxawN3nSvFhMr/qCMZW1dRZK
61z5AAAAgDW9BKQnPLqEeP99qztiS7G3CwcB+cHNpIXuwCpCGvZFJJ0SEb6AmVLVxeV7l13U
62Zp6VRJtinLC3bWY4qeN2rEHKoLdCs957hZAH+2yPfBtcGhVIpnAkj7GpeJAplthwtdbIPw
63VjreanM1/9P98VcaaOo7lHgavZ3JxU7z1+XzpRAAAAFQC7360pZLbv7PFt4BPFJ8zAHxAe
64QwAAAA5leGFya3VuQGJhcnlvbgECAwQ=
65-----END OPENSSH PRIVATE KEY-----
66""")
67
68ACCOUNTS = u"""\
69# dennis {key} URI:DIR2:aaaaaaaaaaaaaaaaaaaaaaaaaa:1111111111111111111111111111111111111111111111111111
70carol {key} URI:DIR2:cccccccccccccccccccccccccc:3333333333333333333333333333333333333333333333333333
71""".format(key=str(DUMMY_KEY.public().toString("openssh"), "ascii")).encode("ascii")
72
73# Python str.splitlines considers NEXT LINE, LINE SEPARATOR, and PARAGRAPH
74# separator to be line separators, too.  However, file.readlines() does not...
75LINE_SEPARATORS = (
76    '\x0a', # line feed
77    '\x0b', # vertical tab
78    '\x0c', # form feed
79    '\x0d', # carriage return
80)
81
82SURROGATES: Literal["Cs"] = "Cs"
83
84
85class AccountFileParserTests(unittest.TestCase):
86    """
87    Tests for ``load_account_file`` and its helper functions.
88    """
89    @given(lists(
90        text(alphabet=characters(
91            blacklist_categories=(
92                # Surrogates are an encoding trick to help out UTF-16.
93                # They're not necessary to represent any non-surrogate code
94                # point in unicode.  They're also not legal individually but
95                # only in pairs.
96                SURROGATES,
97            ),
98            # Exclude all our line separators too.
99            blacklist_characters=("\n", "\r"),
100        )),
101    ))
102    def test_ignore_comments(self, lines):
103        """
104        ``auth.content_lines`` filters out lines beginning with `#` and empty
105        lines.
106        """
107        expected = set()
108
109        # It's not clear that real files and StringIO behave sufficiently
110        # similarly to use the latter instead of the former here.  In
111        # particular, they seem to have distinct and incompatible
112        # line-splitting rules.
113        bufpath = self.mktemp()
114        with open(bufpath, "wt", encoding="utf-8") as buf:
115            for line in lines:
116                stripped = line.strip()
117                is_content = stripped and not stripped.startswith("#")
118                if is_content:
119                    expected.add(stripped)
120                buf.write(line + "\n")
121
122        with auth.open_account_file(bufpath) as buf:
123            actual = set(auth.content_lines(buf))
124
125        self.assertEqual(expected, actual)
126
127    def test_parse_accounts(self):
128        """
129        ``auth.parse_accounts`` accepts an iterator of account lines and returns
130        an iterator of structured account data.
131        """
132        alice_key = DUMMY_KEY.public().toString("openssh").decode("utf-8")
133        alice_cap = "URI:DIR2:aaaa:1111"
134
135        bob_key = DUMMY_KEY_DSA.public().toString("openssh").decode("utf-8")
136        bob_cap = "URI:DIR2:aaaa:2222"
137        self.assertEqual(
138            list(auth.parse_accounts([
139                "alice {} {}".format(alice_key, alice_cap),
140                "bob {} {}".format(bob_key, bob_cap),
141            ])),
142            [
143                ("alice", DUMMY_KEY.public(), alice_cap),
144                ("bob", DUMMY_KEY_DSA.public(), bob_cap),
145            ],
146        )
147
148    def test_parse_accounts_rejects_passwords(self):
149        """
150        The iterator returned by ``auth.parse_accounts`` raises ``ValueError``
151        when processing reaches a line that has what looks like a password
152        instead of an ssh key.
153        """
154        with self.assertRaises(ValueError):
155            list(auth.parse_accounts(["alice apassword URI:DIR2:aaaa:1111"]))
156
157    def test_create_account_maps(self):
158        """
159        ``auth.create_account_maps`` accepts an iterator of structured account
160        data and returns two mappings: one from account name to rootcap, the
161        other from account name to public keys.
162        """
163        alice_cap = "URI:DIR2:aaaa:1111"
164        alice_key = DUMMY_KEY.public()
165        bob_cap = "URI:DIR2:aaaa:2222"
166        bob_key = DUMMY_KEY_DSA.public()
167        accounts = [
168            ("alice", alice_key, alice_cap),
169            ("bob", bob_key, bob_cap),
170        ]
171        self.assertEqual(
172            auth.create_account_maps(accounts),
173            ({
174                b"alice": alice_cap.encode("utf-8"),
175                b"bob": bob_cap.encode("utf-8"),
176            },
177             {
178                 b"alice": [alice_key],
179                 b"bob": [bob_key],
180             }),
181        )
182
183    def test_load_account_file(self):
184        """
185        ``auth.load_account_file`` accepts an iterator of serialized account lines
186        and returns two mappings: one from account name to rootcap, the other
187        from account name to public keys.
188        """
189        alice_key = DUMMY_KEY.public().toString("openssh").decode("utf-8")
190        alice_cap = "URI:DIR2:aaaa:1111"
191
192        bob_key = DUMMY_KEY_DSA.public().toString("openssh").decode("utf-8")
193        bob_cap = "URI:DIR2:aaaa:2222"
194
195        accounts = [
196            "alice {} {}".format(alice_key, alice_cap),
197            "bob {} {}".format(bob_key, bob_cap),
198            "# carol {} {}".format(alice_key, alice_cap),
199        ]
200
201        self.assertEqual(
202            auth.load_account_file(accounts),
203            ({
204                b"alice": alice_cap.encode("utf-8"),
205                b"bob": bob_cap.encode("utf-8"),
206            },
207             {
208                 b"alice": [DUMMY_KEY.public()],
209                 b"bob": [DUMMY_KEY_DSA.public()],
210             }),
211        )
212
213
214class AccountFileCheckerKeyTests(unittest.TestCase):
215    """
216    Tests for key handling done by allmydata.frontends.auth.AccountFileChecker.
217    """
218    def setUp(self):
219        self.account_file = filepath.FilePath(self.mktemp())
220        self.account_file.setContent(ACCOUNTS)
221        abspath = abspath_expanduser_unicode(str(self.account_file.path))
222        self.checker = auth.AccountFileChecker(None, abspath)
223
224    def test_unknown_user(self):
225        """
226        AccountFileChecker.requestAvatarId returns a Deferred that fires with
227        UnauthorizedLogin if called with an SSHPrivateKey object with a
228        username not present in the account file.
229        """
230        key_credentials = credentials.SSHPrivateKey(
231            b"dennis", b"md5", None, None, None)
232        avatarId = self.checker.requestAvatarId(key_credentials)
233        return self.assertFailure(avatarId, error.UnauthorizedLogin)
234
235    def test_unrecognized_key(self):
236        """
237        AccountFileChecker.requestAvatarId returns a Deferred that fires with
238        UnauthorizedLogin if called with an SSHPrivateKey object with a public
239        key other than the one indicated in the account file for the indicated
240        user.
241        """
242        wrong_key_blob = b"""\
243ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAYQDJGMWlPXh2M3pYzTiamjcBIMqctt4VvLVW2QZgEFc86XhGjPXq5QAiRTKv9yVZJR9HW70CfBI7GHun8+v4Wb6aicWBoxgI3OB5NN+OUywdme2HSaif5yenFdQr0ME71Xs=
244"""
245        key_credentials = credentials.SSHPrivateKey(
246            b"carol", b"md5", wrong_key_blob, None, None)
247        avatarId = self.checker.requestAvatarId(key_credentials)
248        return self.assertFailure(avatarId, error.UnauthorizedLogin)
249
250    def test_missing_signature(self):
251        """
252        AccountFileChecker.requestAvatarId returns a Deferred that fires with
253        ValidPublicKey if called with an SSHPrivateKey object with an
254        authorized key for the indicated user but with no signature.
255        """
256        right_key_blob = DUMMY_KEY.public().toString("openssh")
257        key_credentials = credentials.SSHPrivateKey(
258            b"carol", b"md5", right_key_blob, None, None)
259        avatarId = self.checker.requestAvatarId(key_credentials)
260        return self.assertFailure(avatarId, conch_error.ValidPublicKey)
261
262    def test_wrong_signature(self):
263        """
264        AccountFileChecker.requestAvatarId returns a Deferred that fires with
265        UnauthorizedLogin if called with an SSHPrivateKey object with a public
266        key matching that on the user's line in the account file but with the
267        wrong signature.
268        """
269        right_key_blob = DUMMY_KEY.public().toString("openssh")
270        key_credentials = credentials.SSHPrivateKey(
271            b"carol", b"md5", right_key_blob, b"signed data", b"wrong sig")
272        avatarId = self.checker.requestAvatarId(key_credentials)
273        return self.assertFailure(avatarId, error.UnauthorizedLogin)
274
275    def test_authenticated(self):
276        """
277        If called with an SSHPrivateKey object with a username and public key
278        found in the account file and a signature that proves possession of the
279        corresponding private key, AccountFileChecker.requestAvatarId returns a
280        Deferred that fires with an FTPAvatarID giving the username and root
281        capability for that user.
282        """
283        username = b"carol"
284        signed_data = b"signed data"
285        signature = DUMMY_KEY.sign(signed_data)
286        right_key_blob = DUMMY_KEY.public().toString("openssh")
287        key_credentials = credentials.SSHPrivateKey(
288            username, b"md5", right_key_blob, signed_data, signature)
289        avatarId = self.checker.requestAvatarId(key_credentials)
290        def authenticated(avatarId):
291            self.assertEqual(
292                (username,
293                 b"URI:DIR2:cccccccccccccccccccccccccc:3333333333333333333333333333333333333333333333333333"),
294                (avatarId.username, avatarId.rootcap))
295        avatarId.addCallback(authenticated)
296        return avatarId
Note: See TracBrowser for help on using the repository browser.