diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2023-04-27 18:00:16 -0400 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2023-05-05 12:27:20 -0400 |
commit | 162213fa1a4551bd955134c97ca5276a5f29e907 (patch) | |
tree | 5a70c153853fa2114c7f67523cb59db63ecfc5d8 | |
parent | 9ece9fcc8d8e5d22de0a65fcc44374a53c31dfdb (diff) |
Migrate rest of main keys and update suite to be more pytest-relaxed compat
Main branch as of today:
350 passed, 21 skipped, 52 deselected, 3 warnings in 11.10s
This branch as of this commit:
361 passed, 21 skipped, 52 deselected, 3 warnings in 10.51s
Of those 11 "new" tests, 8 are ones I wrote (tests/pkey.py). Hard to
figure out what the other 3 are given pytest-relaxed's output is very
different from regular verbose pytest. oops.
-rw-r--r-- | paramiko/pkey.py | 54 | ||||
-rw-r--r-- | pytest.ini | 5 | ||||
-rw-r--r-- | tests/_loop.py (renamed from tests/loop.py) | 0 | ||||
-rw-r--r-- | tests/_stub_sftp.py (renamed from tests/stub_sftp.py) | 0 | ||||
-rw-r--r-- | tests/_support/ecdsa-256.key (renamed from tests/test_ecdsa_256.key) | 0 | ||||
-rw-r--r-- | tests/_support/ed25519.key (renamed from tests/test_ed25519.key) | 0 | ||||
-rw-r--r-- | tests/_support/rsa.key (renamed from tests/test_rsa.key) | 0 | ||||
-rw-r--r-- | tests/_util.py (renamed from tests/util.py) | 0 | ||||
-rw-r--r-- | tests/conftest.py | 63 | ||||
-rw-r--r-- | tests/pkey.py | 13 | ||||
-rw-r--r-- | tests/test_auth.py | 6 | ||||
-rw-r--r-- | tests/test_client.py | 39 | ||||
-rw-r--r-- | tests/test_config.py | 2 | ||||
-rw-r--r-- | tests/test_file.py | 2 | ||||
-rw-r--r-- | tests/test_gssapi.py | 2 | ||||
-rw-r--r-- | tests/test_kex_gss.py | 6 | ||||
-rw-r--r-- | tests/test_packetizer.py | 2 | ||||
-rw-r--r-- | tests/test_pkey.py | 35 | ||||
-rw-r--r-- | tests/test_sftp.py | 4 | ||||
-rw-r--r-- | tests/test_sftp_big.py | 2 | ||||
-rw-r--r-- | tests/test_ssh_gss.py | 8 | ||||
-rw-r--r-- | tests/test_transport.py | 34 |
22 files changed, 199 insertions, 78 deletions
diff --git a/paramiko/pkey.py b/paramiko/pkey.py index 91a33bed..98bd82cd 100644 --- a/paramiko/pkey.py +++ b/paramiko/pkey.py @@ -33,6 +33,7 @@ import bcrypt from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.ciphers import algorithms, modes, Cipher +from cryptography.hazmat.primitives import asymmetric from paramiko import util from paramiko.util import u, b @@ -64,10 +65,13 @@ class UnknownKeyType(Exception): An unknown public/private key algorithm was attempted to be read. """ - def __init__(self, key_type, key_bytes): + def __init__(self, key_type=None, key_bytes=None): self.key_type = key_type self.key_bytes = key_bytes + def __str__(self): + return f"UnknownKeyType(type={self.key_type!r}, bytes=<{len(self.key_bytes)}>)" # noqa + class PKey: """ @@ -106,6 +110,53 @@ class PKey: END_TAG = re.compile(r"^-{5}END (RSA|DSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$") @staticmethod + def from_path(path, passphrase=None): + """ + Attempt to instantiate appropriate key subclass from given file path. + + :param Path path: The path to load. + + .. versionadded:: 3.2 + """ + # TODO: make sure sphinx is reading Path right in param list... + from paramiko import DSSKey, RSAKey, Ed25519Key, ECDSAKey + + data = path.read_bytes() + # Like OpenSSH, try modern/OpenSSH-specific key load first + try: + loaded = serialization.load_ssh_private_key( + data=data, password=passphrase + ) + # Then fall back to assuming legacy PEM type + except ValueError: + loaded = serialization.load_pem_private_key( + data=data, password=passphrase + ) + # TODO Python 3.10: match statement? (NOTE: we cannot use a dict + # because the results from the loader are literal backend, eg openssl, + # private classes, so isinstance tests work but exact 'x class is y' + # tests will not work) + # TODO: leverage already-parsed/mathed obj to avoid duplicate cpu + # cycles? seemingly requires most of our key subclasses to be rewritten + # to be cryptography-object-forward. this is still likely faster than + # the old SSHClient code that just tried instantiating every class! + key_class = None + if isinstance(loaded, asymmetric.dsa.DSAPrivateKey): + key_class = DSSKey + elif isinstance(loaded, asymmetric.rsa.RSAPrivateKey): + key_class = RSAKey + elif isinstance(loaded, asymmetric.ed25519.Ed25519PrivateKey): + key_class = Ed25519Key + elif isinstance(loaded, asymmetric.ec.EllipticCurvePrivateKey): + key_class = ECDSAKey + else: + raise UnknownKeyType( + key_bytes=data, key_type=loaded.__class__.__name__ + ) + with path.open() as fd: + return key_class.from_private_key(fd, password=passphrase) + + @staticmethod def from_type_string(key_type, key_bytes): """ Given type `str` & raw `bytes`, return a `PKey` subclass instance. @@ -131,6 +182,7 @@ class PKey: for key_class in key_classes: if key_type in key_class.identifiers(): + # TODO: needs to passthru things like passphrase return key_class(data=key_bytes) raise UnknownKeyType(key_type=key_type, key_bytes=key_bytes) @@ -1,6 +1,5 @@ [pytest] -# We use pytest-relaxed just for its utils at the moment, so disable it at the -# plugin level until we adapt test organization to really use it. -addopts = -p no:relaxed +testpaths = tests +python_files = * # Loop on failure looponfailroots = tests paramiko diff --git a/tests/loop.py b/tests/_loop.py index a3740013..a3740013 100644 --- a/tests/loop.py +++ b/tests/_loop.py diff --git a/tests/stub_sftp.py b/tests/_stub_sftp.py index 0c0372e9..0c0372e9 100644 --- a/tests/stub_sftp.py +++ b/tests/_stub_sftp.py diff --git a/tests/test_ecdsa_256.key b/tests/_support/ecdsa-256.key index 42d44734..42d44734 100644 --- a/tests/test_ecdsa_256.key +++ b/tests/_support/ecdsa-256.key diff --git a/tests/test_ed25519.key b/tests/_support/ed25519.key index eb9f94c2..eb9f94c2 100644 --- a/tests/test_ed25519.key +++ b/tests/_support/ed25519.key diff --git a/tests/test_rsa.key b/tests/_support/rsa.key index f50e9c53..f50e9c53 100644 --- a/tests/test_rsa.key +++ b/tests/_support/rsa.key diff --git a/tests/util.py b/tests/_util.py index 2f1c5ac2..2f1c5ac2 100644 --- a/tests/util.py +++ b/tests/_util.py diff --git a/tests/conftest.py b/tests/conftest.py index b28d2a17..beef87c2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,13 +2,24 @@ import logging import os import shutil import threading +from pathlib import Path + +from invoke.vendor.lexicon import Lexicon import pytest -from paramiko import RSAKey, SFTPServer, SFTP, Transport +from paramiko import ( + SFTPServer, + SFTP, + Transport, + DSSKey, + RSAKey, + Ed25519Key, + ECDSAKey, +) -from .loop import LoopSocket -from .stub_sftp import StubServer, StubSFTPServer -from .util import _support +from ._loop import LoopSocket +from ._stub_sftp import StubServer, StubSFTPServer +from ._util import _support from icecream import ic, install as install_ic @@ -71,7 +82,7 @@ def sftp_server(): tc = Transport(sockc) ts = Transport(socks) # Auth - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) ts.add_server_key(host_key) # Server setup event = threading.Event() @@ -103,3 +114,45 @@ def sftp(sftp_server): yield client # Clean up - as in make_sftp_folder, we assume local-only exec for now. shutil.rmtree(client.FOLDER, ignore_errors=True) + + +key_data = [ + ["ssh-rsa", RSAKey, "SHA256:OhNL391d/beeFnxxg18AwWVYTAHww+D4djEE7Co0Yng"], + ["ssh-dss", DSSKey, "SHA256:uHwwykG099f4M4kfzvFpKCTino0/P03DRbAidpAmPm0"], + [ + "ssh-ed25519", + Ed25519Key, + "SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow", + ], + [ + "ecdsa-sha2-nistp256", + ECDSAKey, + "SHA256:BrQG04oNKUETjKCeL4ifkARASg3yxS/pUHl3wWM26Yg", + ], +] +for datum in key_data: + short = datum[0].replace("ssh-", "").replace("sha2-nistp", "") + datum.insert(0, short) + + +@pytest.fixture(scope="session", params=key_data, ids=lambda x: x[0]) +def key(request): + """ + Yield an object for each known type of key, with attributes: + + - ``short_type``: short identifier, eg ``rsa`` or ``ecdsa-256`` + - ``full_type``: the "message style" key identifier, eg ``ssh-rsa``, or + ``ecdsa-sha2-nistp256``. + - ``path``: a pathlib Path object to the fixture key file + - ``pkey``: an instantiated PKey subclass object + - ``fingerprint``: the expected fingerprint of said key + """ + short_type, key_type, key_class, fingerprint = request.param + bag = Lexicon() + bag.short_type = short_type + bag.full_type = key_type + bag.path = Path(_support(f"{short_type}.key")) + with bag.path.open() as fd: + bag.pkey = key_class.from_private_key(fd) + bag.fingerprint = fingerprint + yield bag diff --git a/tests/pkey.py b/tests/pkey.py new file mode 100644 index 00000000..b1cba825 --- /dev/null +++ b/tests/pkey.py @@ -0,0 +1,13 @@ +from paramiko import PKey + + +class PKey_: + class from_type_string: + def loads_from_type_and_bytes(self, key): + obj = PKey.from_type_string(key.full_type, key.pkey.asbytes()) + assert obj == key.pkey + + class from_path: + def loads_from_file_path(self, key): + obj = PKey.from_path(key.path) + assert obj == key.pkey diff --git a/tests/test_auth.py b/tests/test_auth.py index 592e589f..02df8c12 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -37,8 +37,8 @@ from paramiko import ( from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL from paramiko.util import u -from .loop import LoopSocket -from .util import _support, slow +from ._loop import LoopSocket +from ._util import _support, slow _pwd = u("\u2022") @@ -129,7 +129,7 @@ class AuthTest(unittest.TestCase): self.sockc.close() def start_server(self): - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) self.public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) self.event = threading.Event() diff --git a/tests/test_client.py b/tests/test_client.py index 62c92b35..564cda00 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -41,7 +41,7 @@ from paramiko import SSHClient from paramiko.pkey import PublicBlob from paramiko.ssh_exception import SSHException, AuthenticationException -from .util import _support, requires_sha1_signing, slow +from ._util import _support, requires_sha1_signing, slow requires_gss_auth = unittest.skipUnless( @@ -171,10 +171,10 @@ class ClientTest(unittest.TestCase): self.ts = paramiko.Transport(self.socks) if server_name is not None: self.ts.local_version = server_name - keypath = _support("test_rsa.key") + keypath = _support("rsa.key") host_key = paramiko.RSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) - keypath = _support("test_ecdsa_256.key") + keypath = _support("ecdsa-256.key") host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob) @@ -195,7 +195,7 @@ class ClientTest(unittest.TestCase): # Server setup threading.Thread(target=self._run, kwargs=run_kwargs).start() host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") + _support("rsa.key") ) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) @@ -263,18 +263,18 @@ class SSHClientTest(ClientTest): """ verify that SSHClient works with an RSA key. """ - self._test_connection(key_filename=_support("test_rsa.key")) + self._test_connection(key_filename=_support("rsa.key")) @requires_sha1_signing def test_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ - self._test_connection(key_filename=_support("test_ecdsa_256.key")) + self._test_connection(key_filename=_support("ecdsa-256.key")) @requires_sha1_signing def test_client_ed25519(self): - self._test_connection(key_filename=_support("test_ed25519.key")) + self._test_connection(key_filename=_support("ed25519.key")) @requires_sha1_signing def test_multiple_key_files(self): @@ -289,16 +289,17 @@ class SSHClientTest(ClientTest): } # Various combos of attempted & valid keys # TODO: try every possible combo using itertools functions + # TODO: use new key(s) fixture(s) for attempt, accept in ( (["rsa", "dss"], ["dss"]), # Original test #3 (["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly - (["dss", "rsa", "ecdsa_256"], ["dss"]), # Try ECDSA but fail - (["rsa", "ecdsa_256"], ["ecdsa"]), # ECDSA success + (["dss", "rsa", "ecdsa-256"], ["dss"]), # Try ECDSA but fail + (["rsa", "ecdsa-256"], ["ecdsa"]), # ECDSA success ): try: self._test_connection( key_filename=[ - _support("test_{}.key".format(x)) for x in attempt + _support("{}.key".format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -318,7 +319,7 @@ class SSHClientTest(ClientTest): self.assertRaises( SSHException, self._test_connection, - key_filename=[_support("test_rsa.key")], + key_filename=[_support("rsa.key")], allowed_keys=["ecdsa-sha2-nistp256"], ) @@ -338,7 +339,7 @@ class SSHClientTest(ClientTest): @requires_sha1_signing def test_certs_implicitly_loaded_alongside_key_filename_keys(self): - # NOTE: a regular test_connection() w/ test_rsa.key would incidentally + # NOTE: a regular test_connection() w/ rsa.key would incidentally # test this (because test_xxx.key-cert.pub exists) but incidental tests # stink, so NullServer and friends were updated to allow assertions # about the server-side key object's public blob. Thus, we can prove @@ -391,7 +392,7 @@ class SSHClientTest(ClientTest): """ threading.Thread(target=self._run).start() hostname = f"[{self.addr}]:{self.port}" - key_file = _support("test_ecdsa_256.key") + key_file = _support("ecdsa-256.key") public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = SSHClient() @@ -415,7 +416,7 @@ class SSHClientTest(ClientTest): warnings.filterwarnings("ignore", "tempnam.*") host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") + _support("rsa.key") ) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) fd, localname = mkstemp() @@ -517,7 +518,7 @@ class SSHClientTest(ClientTest): # Start the thread with a 1 second wait. threading.Thread(target=self._run, kwargs={"delay": 1}).start() host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") + _support("rsa.key") ) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) @@ -593,7 +594,7 @@ class SSHClientTest(ClientTest): """ Failed gssapi-keyex doesn't prevent subsequent key from succeeding """ - kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")]) + kwargs = dict(gss_kex=True, key_filename=[_support("rsa.key")]) self._test_connection(**kwargs) @requires_gss_auth @@ -601,7 +602,7 @@ class SSHClientTest(ClientTest): """ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding """ - kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")]) + kwargs = dict(gss_auth=True, key_filename=[_support("rsa.key")]) self._test_connection(**kwargs) def test_reject_policy(self): @@ -683,11 +684,11 @@ class SSHClientTest(ClientTest): self._client_host_key_bad(host_key) def test_host_key_negotiation_3(self): - self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key") + self._client_host_key_good(paramiko.ECDSAKey, "ecdsa-256.key") @requires_sha1_signing def test_host_key_negotiation_4(self): - self._client_host_key_good(paramiko.RSAKey, "test_rsa.key") + self._client_host_key_good(paramiko.RSAKey, "rsa.key") def _setup_for_env(self): threading.Thread(target=self._run).start() diff --git a/tests/test_config.py b/tests/test_config.py index a2c60a32..fcb120b6 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -19,7 +19,7 @@ from paramiko import ( ConfigParseError, ) -from .util import _config +from ._util import _config @fixture diff --git a/tests/test_file.py b/tests/test_file.py index 456c0388..9344495b 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -26,7 +26,7 @@ from io import BytesIO from paramiko.common import linefeed_byte, crlf, cr_byte from paramiko.file import BufferedFile -from .util import needs_builtin +from ._util import needs_builtin class LoopbackFile(BufferedFile): diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 671f1ba0..da62fd97 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -24,7 +24,7 @@ Test the used APIs for GSS-API / SSPI authentication import socket -from .util import needs_gssapi, KerberosTestCase, update_env +from ._util import needs_gssapi, KerberosTestCase, update_env # # NOTE: KerberosTestCase skips all tests if it was unable to import k5test diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index d4868f4a..c33f4c68 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -31,7 +31,7 @@ import unittest import paramiko -from .util import needs_gssapi, KerberosTestCase, update_env +from ._util import needs_gssapi, KerberosTestCase, update_env class NullServer(paramiko.ServerInterface): @@ -80,7 +80,7 @@ class GSSKexTest(KerberosTestCase): def _run(self): self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks, gss_kex=True) - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key") self.ts.add_server_key(host_key) self.ts.set_gss_host(self.realm.hostname) try: @@ -96,7 +96,7 @@ class GSSKexTest(KerberosTestCase): Diffie-Hellman Key Exchange and user authentication with the GSS-API context created during key exchange. """ - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key") public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index d4dd58ad..aee21c21 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -30,7 +30,7 @@ from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes from paramiko import Message, Packetizer, util from paramiko.common import byte_chr, zero_byte -from .loop import LoopSocket +from ._loop import LoopSocket x55 = byte_chr(0x55) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 5dfaaff7..c5b20f91 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -45,7 +45,7 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers from unittest.mock import patch, Mock import pytest -from .util import _support, is_low_entropy, requires_sha1_signing +from ._util import _support, is_low_entropy, requires_sha1_signing # from openssh's ssh-keygen @@ -161,7 +161,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(exp, key) def test_load_rsa(self): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) self.assertEqual("ssh-rsa", key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(":", "")) my_rsa = hexlify(key.get_fingerprint()) @@ -184,7 +184,7 @@ class KeyTest(unittest.TestCase): ) as loader: loader.side_effect = exception with pytest.raises(SSHException, match=str(exception)): - RSAKey.from_private_key_file(_support("test_rsa.key")) + RSAKey.from_private_key_file(_support("rsa.key")) def test_loading_empty_keys_errors_usefully(self): # #1599 - raise SSHException instead of IndexError @@ -231,7 +231,7 @@ class KeyTest(unittest.TestCase): def test_compare_rsa(self): # verify that the private & public keys compare equal - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) self.assertEqual(key, key) pub = RSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -248,7 +248,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, pub) def _sign_and_verify_rsa(self, algorithm, saved_sig): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) msg = key.sign_ssh_data(b"ice weasels", algorithm) assert isinstance(msg, Message) msg.rewind() @@ -329,7 +329,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key.get_name(), "ecdsa-sha2-nistp521") def test_load_ecdsa_256(self): - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) self.assertEqual("ecdsa-sha2-nistp256", key.get_name()) exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(":", "")) my_ecdsa = hexlify(key.get_fingerprint()) @@ -357,7 +357,7 @@ class KeyTest(unittest.TestCase): def test_compare_ecdsa_256(self): # verify that the private & public keys compare equal - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -366,7 +366,7 @@ class KeyTest(unittest.TestCase): def test_sign_ecdsa_256(self): # verify that the rsa private key can sign and verify - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) msg = key.sign_ssh_data(b"ice weasels") self.assertTrue(type(msg) is Message) msg.rewind() @@ -408,7 +408,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(384, key.get_bits()) def test_load_ecdsa_transmutes_crypto_exceptions(self): - path = _support("test_ecdsa_256.key") + path = _support("ecdsa-256.key") # TODO: nix unittest for pytest for exception in (TypeError("onoz"), UnsupportedAlgorithm("oops")): with patch( @@ -569,12 +569,12 @@ class KeyTest(unittest.TestCase): RSAKey.from_private_key_file(_support("test_rsa_openssh_nopad.key")) def test_stringification(self): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) comparable = TEST_KEY_BYTESTR self.assertEqual(str(key), comparable) def test_ed25519(self): - key1 = Ed25519Key.from_private_key_file(_support("test_ed25519.key")) + key1 = Ed25519Key.from_private_key_file(_support("ed25519.key")) key2 = Ed25519Key.from_private_key_file( _support("test_ed25519_password.key"), b"abc123" ) @@ -594,7 +594,7 @@ class KeyTest(unittest.TestCase): def test_ed25519_compare(self): # verify that the private & public keys compare equal - key = Ed25519Key.from_private_key_file(_support("test_ed25519.key")) + key = Ed25519Key.from_private_key_file(_support("ed25519.key")) self.assertEqual(key, key) pub = Ed25519Key(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -616,12 +616,13 @@ class KeyTest(unittest.TestCase): ) assert original != generated + # TODO: use keys fixture def keys(self): for key_class, filename in [ - (RSAKey, "test_rsa.key"), + (RSAKey, "rsa.key"), (DSSKey, "dss.key"), - (ECDSAKey, "test_ecdsa_256.key"), - (Ed25519Key, "test_ed25519.key"), + (ECDSAKey, "ecdsa-256.key"), + (Ed25519Key, "ed25519.key"), ]: key1 = key_class.from_private_key_file(_support(filename)) key2 = key_class.from_private_key_file(_support(filename)) @@ -643,6 +644,7 @@ class KeyTest(unittest.TestCase): for key1, key2 in self.keys(): assert hash(key1) == hash(key2) + # TODO: use keys fixture def test_new_fingerprint(self): # Assumes the RSA, DSS, ECDSA, Ed25519 order seen in 'def keys'. fingerprints = [x.fingerprint for x, _ in self.keys()] @@ -653,6 +655,7 @@ class KeyTest(unittest.TestCase): "SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow", ] + # TODO: use keys fixture def test_algorithm_property(self): # Assumes the RSA, DSS, ECDSA, Ed25519 order seen in 'def keys'. algorithms = [x.algorithm_name for x, _ in self.keys()] @@ -669,7 +672,7 @@ class KeyTest(unittest.TestCase): # No exception -> it's good. Meh. def test_ed25519_load_from_file_obj(self): - with open(_support("test_ed25519.key")) as pkey_fileobj: + with open(_support("ed25519.key")) as pkey_fileobj: key = Ed25519Key.from_private_key(pkey_fileobj) self.assertEqual(key, key) self.assertTrue(key.can_sign()) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index be123de4..7fd274bc 100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -38,8 +38,8 @@ from paramiko.sftp_attr import SFTPAttributes from paramiko.util import b, u from tests import requireNonAsciiLocale -from .util import needs_builtin -from .util import slow +from ._util import needs_builtin +from ._util import slow ARTICLE = """ diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index 5192f657..acfe71e3 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -30,7 +30,7 @@ import time from paramiko.common import o660 -from .util import slow +from ._util import slow @slow diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index a8175ccb..27976a8d 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -28,7 +28,7 @@ import threading import paramiko -from .util import _support, needs_gssapi, KerberosTestCase, update_env +from ._util import _support, needs_gssapi, KerberosTestCase, update_env from .test_client import FINGERPRINTS @@ -89,7 +89,7 @@ class GSSAuthTest(KerberosTestCase): def _run(self): self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks) - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key") self.ts.add_server_key(host_key) server = NullServer() self.ts.start_server(self.event, server) @@ -100,7 +100,7 @@ class GSSAuthTest(KerberosTestCase): The exception is ... no exception yet """ - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key") public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() @@ -154,7 +154,7 @@ class GSSAuthTest(KerberosTestCase): "this_host_does_not_exists_and_causes_a_GSSAPI-exception" ) self._test_connection( - key_filename=[_support("test_rsa.key")], + key_filename=[_support("rsa.key")], allow_agent=False, look_for_keys=False, ) diff --git a/tests/test_transport.py b/tests/test_transport.py index a6b15ee1..d8ac8a4b 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -60,8 +60,8 @@ from paramiko.common import ( ) from paramiko.message import Message -from .util import needs_builtin, _support, requires_sha1_signing, slow -from .loop import LoopSocket +from ._util import needs_builtin, _support, requires_sha1_signing, slow +from ._loop import LoopSocket LONG_BANNER = """\ @@ -168,7 +168,7 @@ class TransportTest(unittest.TestCase): def setup_test_server( self, client_options=None, server_options=None, connect_kwargs=None ): - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) @@ -234,7 +234,7 @@ class TransportTest(unittest.TestCase): loopback sockets. this is hardly "simple" but it's simpler than the later tests. :) """ - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -260,7 +260,7 @@ class TransportTest(unittest.TestCase): """ verify that a long banner doesn't mess up the handshake. """ - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -910,7 +910,7 @@ class TransportTest(unittest.TestCase): # be fine. Even tho it's a bit squicky. self.tc.packetizer = SlowPacketizer(self.tc.sock) # Continue with regular test red tape. - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -1204,7 +1204,7 @@ def server( :param hostkey: Host key to use for the server; if None, loads - ``test_rsa.key``. + ``rsa.key``. :param init: Default `Transport` constructor kwargs to use for both sides. :param server_init: @@ -1234,7 +1234,7 @@ def server( ts = Transport(socks, **dict(init, **server_init)) if hostkey is None: - hostkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + hostkey = RSAKey.from_private_key_file(_support("rsa.key")) ts.add_server_key(hostkey) event = threading.Event() server = NullServer(allowed_keys=pubkeys) @@ -1344,7 +1344,7 @@ class TestSHA2SignatureKeyExchange(unittest.TestCase): # (This is a regression test vs previous implementation which overwrote # the entire preferred-hostkeys structure when given an explicit key as # a client.) - hostkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + hostkey = RSAKey.from_private_key_file(_support("rsa.key")) with server(hostkey=hostkey, connect=dict(hostkey=hostkey)) as (tc, _): assert tc.host_key_type == "rsa-sha2-512" @@ -1359,7 +1359,7 @@ class TestExtInfo(unittest.TestCase): } def test_client_uses_server_sig_algs_for_pubkey_auth(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1376,7 +1376,7 @@ class TestExtInfo(unittest.TestCase): # with this module anyways... class TestSHA2SignaturePubkeys(unittest.TestCase): def test_pubkey_auth_honors_disabled_algorithms(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1391,7 +1391,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): assert "no RSA pubkey algorithms" in str(err) def test_client_sha2_disabled_server_sha1_disabled_no_match(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1402,7 +1402,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): assert isinstance(err, AuthenticationException) def test_client_sha1_disabled_server_sha2_disabled_no_match(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1414,7 +1414,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): @requires_sha1_signing def test_ssh_rsa_still_used_when_sha2_disabled(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) # NOTE: this works because key obj comparison uses public bytes # TODO: would be nice for PKey to grow a legit "give me another obj of # same class but just the public bits" using asbytes() @@ -1424,7 +1424,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): assert tc.is_authenticated() def test_sha2_512(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1436,7 +1436,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): assert tc._agreed_pubkey_algorithm == "rsa-sha2-512" def test_sha2_256(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1448,7 +1448,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" def test_sha2_256_when_client_only_enables_256(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), |