diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/_loop.py (renamed from tests/loop.py) | 0 | ||||
-rw-r--r-- | tests/_stub_sftp.py (renamed from tests/stub_sftp.py) | 0 | ||||
-rw-r--r-- | tests/_support/dss.key (renamed from tests/cert_support/test_dss.key) | 0 | ||||
-rw-r--r-- | tests/_support/dss.key-cert.pub (renamed from tests/cert_support/test_dss.key-cert.pub) | 0 | ||||
-rw-r--r-- | tests/_support/ecdsa-256.key (renamed from tests/cert_support/test_ecdsa_256.key) | 0 | ||||
-rw-r--r-- | tests/_support/ecdsa-256.key-cert.pub (renamed from tests/cert_support/test_ecdsa_256.key-cert.pub) | 0 | ||||
-rw-r--r-- | tests/_support/ed25519.key (renamed from tests/cert_support/test_ed25519.key) | 0 | ||||
-rw-r--r-- | tests/_support/ed25519.key-cert.pub (renamed from tests/cert_support/test_ed25519.key-cert.pub) | 0 | ||||
-rw-r--r-- | tests/_support/ed448.key | 4 | ||||
-rw-r--r-- | tests/_support/rsa-lonely.key (renamed from tests/cert_support/test_rsa.key) | 0 | ||||
-rw-r--r-- | tests/_support/rsa-missing.key-cert.pub (renamed from tests/cert_support/test_rsa.key-cert.pub) | 0 | ||||
-rw-r--r-- | tests/_support/rsa.key (renamed from tests/test_rsa.key) | 0 | ||||
-rw-r--r-- | tests/_support/rsa.key-cert.pub | 1 | ||||
-rw-r--r-- | tests/_util.py | 441 | ||||
-rw-r--r-- | tests/agent.py | 151 | ||||
-rw-r--r-- | tests/auth.py | 580 | ||||
-rw-r--r-- | tests/conftest.py | 75 | ||||
-rw-r--r-- | tests/pkey.py | 229 | ||||
-rw-r--r-- | tests/test_agent.py | 50 | ||||
-rw-r--r-- | tests/test_auth.py | 272 | ||||
-rw-r--r-- | tests/test_client.py | 69 | ||||
-rw-r--r-- | tests/test_config.py | 2 | ||||
-rw-r--r-- | tests/test_dss.key | 12 | ||||
-rw-r--r-- | tests/test_ecdsa_256.key | 5 | ||||
-rw-r--r-- | tests/test_ed25519.key | 8 | ||||
-rw-r--r-- | tests/test_file.py | 2 | ||||
-rw-r--r-- | tests/test_gssapi.py | 2 | ||||
-rw-r--r-- | tests/test_hostkeys.py | 25 | ||||
-rw-r--r-- | tests/test_kex_gss.py | 6 | ||||
-rw-r--r-- | tests/test_packetizer.py | 2 | ||||
-rw-r--r-- | tests/test_pkey.py | 121 | ||||
-rw-r--r-- | tests/test_sftp.py | 4 | ||||
-rw-r--r-- | tests/test_sftp_big.py | 2 | ||||
-rw-r--r-- | tests/test_ssh_gss.py | 8 | ||||
-rw-r--r-- | tests/test_transport.py | 318 | ||||
-rw-r--r-- | tests/test_util.py | 13 | ||||
-rw-r--r-- | tests/util.py | 174 |
37 files changed, 1609 insertions, 967 deletions
diff --git a/tests/loop.py b/tests/_loop.py index a3740013..a3740013 100644 --- a/tests/loop.py +++ b/tests/_loop.py diff --git a/tests/stub_sftp.py b/tests/_stub_sftp.py index 0c0372e9..0c0372e9 100644 --- a/tests/stub_sftp.py +++ b/tests/_stub_sftp.py diff --git a/tests/cert_support/test_dss.key b/tests/_support/dss.key index e10807f1..e10807f1 100644 --- a/tests/cert_support/test_dss.key +++ b/tests/_support/dss.key diff --git a/tests/cert_support/test_dss.key-cert.pub b/tests/_support/dss.key-cert.pub index 07fd5578..07fd5578 100644 --- a/tests/cert_support/test_dss.key-cert.pub +++ b/tests/_support/dss.key-cert.pub diff --git a/tests/cert_support/test_ecdsa_256.key b/tests/_support/ecdsa-256.key index 42d44734..42d44734 100644 --- a/tests/cert_support/test_ecdsa_256.key +++ b/tests/_support/ecdsa-256.key diff --git a/tests/cert_support/test_ecdsa_256.key-cert.pub b/tests/_support/ecdsa-256.key-cert.pub index f2c93ccf..f2c93ccf 100644 --- a/tests/cert_support/test_ecdsa_256.key-cert.pub +++ b/tests/_support/ecdsa-256.key-cert.pub diff --git a/tests/cert_support/test_ed25519.key b/tests/_support/ed25519.key index eb9f94c2..eb9f94c2 100644 --- a/tests/cert_support/test_ed25519.key +++ b/tests/_support/ed25519.key diff --git a/tests/cert_support/test_ed25519.key-cert.pub b/tests/_support/ed25519.key-cert.pub index 4e01415a..4e01415a 100644 --- a/tests/cert_support/test_ed25519.key-cert.pub +++ b/tests/_support/ed25519.key-cert.pub diff --git a/tests/_support/ed448.key b/tests/_support/ed448.key new file mode 100644 index 00000000..887b51c6 --- /dev/null +++ b/tests/_support/ed448.key @@ -0,0 +1,4 @@ +-----BEGIN PRIVATE KEY----- +MEcCAQAwBQYDK2VxBDsEOcvcl9IoD0ktR5RWtW84NM7O2e4LmD2cWfRg7Wht/OA9 +POkmRW12VNvlP6BsXKir5yygumIjD91SQQ== +-----END PRIVATE KEY----- diff --git a/tests/cert_support/test_rsa.key b/tests/_support/rsa-lonely.key index f50e9c53..f50e9c53 100644 --- a/tests/cert_support/test_rsa.key +++ b/tests/_support/rsa-lonely.key diff --git a/tests/cert_support/test_rsa.key-cert.pub b/tests/_support/rsa-missing.key-cert.pub index 7487ab66..7487ab66 100644 --- a/tests/cert_support/test_rsa.key-cert.pub +++ b/tests/_support/rsa-missing.key-cert.pub diff --git a/tests/test_rsa.key b/tests/_support/rsa.key index f50e9c53..f50e9c53 100644 --- a/tests/test_rsa.key +++ b/tests/_support/rsa.key diff --git a/tests/_support/rsa.key-cert.pub b/tests/_support/rsa.key-cert.pub new file mode 100644 index 00000000..7487ab66 --- /dev/null +++ b/tests/_support/rsa.key-cert.pub @@ -0,0 +1 @@ +ssh-rsa-cert-v01@openssh.com AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgsZlXTd5NE4uzGAn6TyAqQj+IPbsTEFGap2x5pTRwQR8AAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4cAAAAAAAAE0gAAAAEAAAAmU2FtcGxlIHNlbGYtc2lnbmVkIE9wZW5TU0ggY2VydGlmaWNhdGUAAAASAAAABXVzZXIxAAAABXVzZXIyAAAAAAAAAAD//////////wAAAAAAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZGluZwAAAAAAAAAXcGVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pdC1wb3J0LWZvcndhcmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1pdC11c2VyLXJjAAAAAAAAAAAAAACVAAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4cAAACPAAAAB3NzaC1yc2EAAACATFHFsARDgQevc6YLxNnDNjsFtZ08KPMyYVx0w5xm95IVZHVWSOc5w+ccjqN9HRwxV3kP7IvL91qx0Uc3MJdB9g/O6HkAP+rpxTVoTb2EAMekwp5+i8nQJW4CN2BSsbQY1M6r7OBZ5nmF4hOW/5Pu4l22lXe2ydy8kEXOEuRpUeQ= test_rsa.key.pub diff --git a/tests/_util.py b/tests/_util.py new file mode 100644 index 00000000..eaf6aac4 --- /dev/null +++ b/tests/_util.py @@ -0,0 +1,441 @@ +from contextlib import contextmanager +from os.path import dirname, realpath, join +import builtins +import os +from pathlib import Path +import socket +import struct +import sys +import unittest +from time import sleep +import threading + +import pytest + +from paramiko import ( + ServerInterface, + RSAKey, + DSSKey, + AUTH_FAILED, + AUTH_PARTIALLY_SUCCESSFUL, + AUTH_SUCCESSFUL, + OPEN_SUCCEEDED, + OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED, + InteractiveQuery, + Transport, +) +from paramiko.ssh_gss import GSS_AUTH_AVAILABLE + +from cryptography.exceptions import UnsupportedAlgorithm, _Reasons +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding, rsa + +tests_dir = dirname(realpath(__file__)) + +from ._loop import LoopSocket + + +def _support(filename): + base = Path(tests_dir) + top = base / filename + deeper = base / "_support" / filename + return str(deeper if deeper.exists() else top) + + +def _config(name): + return join(tests_dir, "configs", name) + + +needs_gssapi = pytest.mark.skipif( + not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test" +) + + +def needs_builtin(name): + """ + Skip decorated test if builtin name does not exist. + """ + reason = "Test requires a builtin '{}'".format(name) + return pytest.mark.skipif(not hasattr(builtins, name), reason=reason) + + +slow = pytest.mark.slow + +# GSSAPI / Kerberos related tests need a working Kerberos environment. +# The class `KerberosTestCase` provides such an environment or skips all tests. +# There are 3 distinct cases: +# +# - A Kerberos environment has already been created and the environment +# contains the required information. +# +# - We can use the package 'k5test' to setup an working kerberos environment on +# the fly. +# +# - We skip all tests. +# +# ToDo: add a Windows specific implementation? + +if ( + os.environ.get("K5TEST_USER_PRINC", None) + and os.environ.get("K5TEST_HOSTNAME", None) + and os.environ.get("KRB5_KTNAME", None) +): # add other vars as needed + + # The environment provides the required information + class DummyK5Realm: + def __init__(self): + for k in os.environ: + if not k.startswith("K5TEST_"): + continue + setattr(self, k[7:].lower(), os.environ[k]) + self.env = {} + + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.realm = DummyK5Realm() + + @classmethod + def tearDownClass(cls): + del cls.realm + +else: + try: + # Try to setup a kerberos environment + from k5test import KerberosTestCase + except Exception: + # Use a dummy, that skips all tests + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + raise unittest.SkipTest( + "Missing extension package k5test. " + 'Please run "pip install k5test" ' + "to install it." + ) + + +def update_env(testcase, mapping, env=os.environ): + """Modify os.environ during a test case and restore during cleanup.""" + saved_env = env.copy() + + def replace(target, source): + target.update(source) + for k in list(target): + if k not in source: + target.pop(k, None) + + testcase.addCleanup(replace, env, saved_env) + env.update(mapping) + return testcase + + +def k5shell(args=None): + """Create a shell with an kerberos environment + + This can be used to debug paramiko or to test the old GSSAPI. + To test a different GSSAPI, simply activate a suitable venv + within the shell. + """ + import k5test + import atexit + import subprocess + + k5 = k5test.K5Realm() + atexit.register(k5.stop) + os.environ.update(k5.env) + for n in ("realm", "user_princ", "hostname"): + os.environ["K5TEST_" + n.upper()] = getattr(k5, n) + + if not args: + args = sys.argv[1:] + if not args: + args = [os.environ.get("SHELL", "bash")] + sys.exit(subprocess.call(args)) + + +def is_low_entropy(): + """ + Attempts to detect whether running interpreter is low-entropy. + + "low-entropy" is defined as being in 32-bit mode and with the hash seed set + to zero. + """ + is_32bit = struct.calcsize("P") == 32 / 8 + # I don't see a way to tell internally if the hash seed was set this + # way, but env should be plenty sufficient, this is only for testing. + return is_32bit and os.environ.get("PYTHONHASHSEED", None) == "0" + + +def sha1_signing_unsupported(): + """ + This is used to skip tests in environments where SHA-1 signing is + not supported by the backend. + """ + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + message = b"Some dummy text" + try: + private_key.sign( + message, + padding.PSS( + mgf=padding.MGF1(hashes.SHA1()), + salt_length=padding.PSS.MAX_LENGTH, + ), + hashes.SHA1(), + ) + return False + except UnsupportedAlgorithm as e: + return e._reason is _Reasons.UNSUPPORTED_HASH + + +requires_sha1_signing = unittest.skipIf( + sha1_signing_unsupported(), "SHA-1 signing not supported" +) + +_disable_sha2 = dict( + disabled_algorithms=dict(keys=["rsa-sha2-256", "rsa-sha2-512"]) +) +_disable_sha1 = dict(disabled_algorithms=dict(keys=["ssh-rsa"])) +_disable_sha2_pubkey = dict( + disabled_algorithms=dict(pubkeys=["rsa-sha2-256", "rsa-sha2-512"]) +) +_disable_sha1_pubkey = dict(disabled_algorithms=dict(pubkeys=["ssh-rsa"])) + + +unicodey = "\u2022" + + +class TestServer(ServerInterface): + paranoid_did_password = False + paranoid_did_public_key = False + # TODO: make this ed25519 or something else modern? (_is_ this used??) + paranoid_key = DSSKey.from_private_key_file(_support("dss.key")) + + def __init__(self, allowed_keys=None): + self.allowed_keys = allowed_keys if allowed_keys is not None else [] + + def check_channel_request(self, kind, chanid): + if kind == "bogus": + return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED + return OPEN_SUCCEEDED + + def check_channel_exec_request(self, channel, command): + if command != b"yes": + return False + return True + + def check_channel_shell_request(self, channel): + return True + + def check_global_request(self, kind, msg): + self._global_request = kind + # NOTE: for w/e reason, older impl of this returned False always, even + # tho that's only supposed to occur if the request cannot be served. + # For now, leaving that the default unless test supplies specific + # 'acceptable' request kind + return kind == "acceptable" + + def check_channel_x11_request( + self, + channel, + single_connection, + auth_protocol, + auth_cookie, + screen_number, + ): + self._x11_single_connection = single_connection + self._x11_auth_protocol = auth_protocol + self._x11_auth_cookie = auth_cookie + self._x11_screen_number = screen_number + return True + + def check_port_forward_request(self, addr, port): + self._listen = socket.socket() + self._listen.bind(("127.0.0.1", 0)) + self._listen.listen(1) + return self._listen.getsockname()[1] + + def cancel_port_forward_request(self, addr, port): + self._listen.close() + self._listen = None + + def check_channel_direct_tcpip_request(self, chanid, origin, destination): + self._tcpip_dest = destination + return OPEN_SUCCEEDED + + def get_allowed_auths(self, username): + if username == "slowdive": + return "publickey,password" + if username == "paranoid": + if ( + not self.paranoid_did_password + and not self.paranoid_did_public_key + ): + return "publickey,password" + elif self.paranoid_did_password: + return "publickey" + else: + return "password" + if username == "commie": + return "keyboard-interactive" + if username == "utf8": + return "password" + if username == "non-utf8": + return "password" + return "publickey" + + def check_auth_password(self, username, password): + if (username == "slowdive") and (password == "pygmalion"): + return AUTH_SUCCESSFUL + if (username == "paranoid") and (password == "paranoid"): + # 2-part auth (even openssh doesn't support this) + self.paranoid_did_password = True + if self.paranoid_did_public_key: + return AUTH_SUCCESSFUL + return AUTH_PARTIALLY_SUCCESSFUL + if (username == "utf8") and (password == unicodey): + return AUTH_SUCCESSFUL + if (username == "non-utf8") and (password == "\xff"): + return AUTH_SUCCESSFUL + if username == "bad-server": + raise Exception("Ack!") + if username == "unresponsive-server": + sleep(5) + return AUTH_SUCCESSFUL + return AUTH_FAILED + + def check_auth_publickey(self, username, key): + if (username == "paranoid") and (key == self.paranoid_key): + # 2-part auth + self.paranoid_did_public_key = True + if self.paranoid_did_password: + return AUTH_SUCCESSFUL + return AUTH_PARTIALLY_SUCCESSFUL + # TODO: make sure all tests incidentally using this to pass, _without + # sending a username oops_, get updated somehow - probably via server() + # default always injecting a username + elif key in self.allowed_keys: + return AUTH_SUCCESSFUL + return AUTH_FAILED + + def check_auth_interactive(self, username, submethods): + if username == "commie": + self.username = username + return InteractiveQuery( + "password", "Please enter a password.", ("Password", False) + ) + return AUTH_FAILED + + def check_auth_interactive_response(self, responses): + if self.username == "commie": + if (len(responses) == 1) and (responses[0] == "cat"): + return AUTH_SUCCESSFUL + return AUTH_FAILED + + +@contextmanager +def server( + hostkey=None, + init=None, + server_init=None, + client_init=None, + connect=None, + pubkeys=None, + catch_error=False, + transport_factory=None, + defer=False, + skip_verify=False, +): + """ + SSH server contextmanager for testing. + + :param hostkey: + Host key to use for the server; if None, loads + ``rsa.key``. + :param init: + Default `Transport` constructor kwargs to use for both sides. + :param server_init: + Extends and/or overrides ``init`` for server transport only. + :param client_init: + Extends and/or overrides ``init`` for client transport only. + :param connect: + Kwargs to use for ``connect()`` on the client. + :param pubkeys: + List of public keys for auth. + :param catch_error: + Whether to capture connection errors & yield from contextmanager. + Necessary for connection_time exception testing. + :param transport_factory: + Like the same-named param in SSHClient: which Transport class to use. + :param bool defer: + Whether to defer authentication during connecting. + + This is really just shorthand for ``connect={}`` which would do roughly + the same thing. Also: this implies skip_verify=True automatically! + :param bool skip_verify: + Whether NOT to do the default "make sure auth passed" check. + """ + if init is None: + init = {} + if server_init is None: + server_init = {} + if client_init is None: + client_init = {} + if connect is None: + # No auth at all please + if defer: + connect = dict() + # Default username based auth + else: + connect = dict(username="slowdive", password="pygmalion") + socks = LoopSocket() + sockc = LoopSocket() + sockc.link(socks) + if transport_factory is None: + transport_factory = Transport + tc = transport_factory(sockc, **dict(init, **client_init)) + ts = transport_factory(socks, **dict(init, **server_init)) + + if hostkey is None: + hostkey = RSAKey.from_private_key_file(_support("rsa.key")) + ts.add_server_key(hostkey) + event = threading.Event() + server = TestServer(allowed_keys=pubkeys) + assert not event.is_set() + assert not ts.is_active() + assert tc.get_username() is None + assert ts.get_username() is None + assert not tc.is_authenticated() + assert not ts.is_authenticated() + + err = None + # Trap errors and yield instead of raising right away; otherwise callers + # cannot usefully deal with problems at connect time which stem from errors + # in the server side. + try: + ts.start_server(event, server) + tc.connect(**connect) + + event.wait(1.0) + assert event.is_set() + assert ts.is_active() + assert tc.is_active() + + except Exception as e: + if not catch_error: + raise + err = e + + yield (tc, ts, err) if catch_error else (tc, ts) + + if not (catch_error or skip_verify): + assert ts.is_authenticated() + assert tc.is_authenticated() + + tc.close() + ts.close() + socks.close() + sockc.close() diff --git a/tests/agent.py b/tests/agent.py new file mode 100644 index 00000000..bcbfb216 --- /dev/null +++ b/tests/agent.py @@ -0,0 +1,151 @@ +from unittest.mock import Mock + +from pytest import mark, raises + +from paramiko import AgentKey, Message, RSAKey +from paramiko.agent import ( + SSH2_AGENT_SIGN_RESPONSE, + SSH_AGENT_RSA_SHA2_256, + SSH_AGENT_RSA_SHA2_512, + cSSH2_AGENTC_SIGN_REQUEST, +) + +from ._util import _support + + +# AgentKey with no inner_key +class _BareAgentKey(AgentKey): + def __init__(self, name, blob): + self.name = name + self.blob = blob + self.inner_key = None + + +class AgentKey_: + def str_is_repr(self): + # Tests for a missed spot in Python 3 upgrades: AgentKey.__str__ was + # returning bytes, as if under Python 2. When bug present, this + # explodes with "__str__ returned non-string". + key = AgentKey(None, b"secret!!!") + assert str(key) == repr(key) + + class init: + def needs_at_least_two_arguments(self): + with raises(TypeError): + AgentKey() + with raises(TypeError): + AgentKey(None) + + def sets_attributes_and_parses_blob(self): + agent = Mock() + blob = Message() + blob.add_string("bad-type") + key = AgentKey(agent=agent, blob=bytes(blob)) + assert key.agent is agent + assert key.name == "bad-type" + assert key.blob == bytes(blob) + assert key.comment == "" # default + # TODO: logger testing + assert key.inner_key is None # no 'bad-type' algorithm + + def comment_optional(self): + blob = Message() + blob.add_string("bad-type") + key = AgentKey(agent=Mock(), blob=bytes(blob), comment="hi!") + assert key.comment == "hi!" + + def sets_inner_key_when_known_type(self, keys): + key = AgentKey(agent=Mock(), blob=bytes(keys.pkey)) + assert key.inner_key == keys.pkey + + class fields: + def defaults_to_get_name_and_blob(self): + key = _BareAgentKey(name="lol", blob=b"lmao") + assert key._fields == ["lol", b"lmao"] + + # TODO: pytest-relaxed is buggy (now?), this shows up under get_bits? + def defers_to_inner_key_when_present(self, keys): + key = AgentKey(agent=None, blob=keys.pkey.asbytes()) + assert key._fields == keys.pkey._fields + assert key == keys.pkey + + class get_bits: + def defaults_to_superclass_implementation(self): + # TODO 4.0: assert raises NotImplementedError like changed parent? + assert _BareAgentKey(None, None).get_bits() == 0 + + def defers_to_inner_key_when_present(self, keys): + key = AgentKey(agent=None, blob=keys.pkey.asbytes()) + assert key.get_bits() == keys.pkey.get_bits() + + class asbytes: + def defaults_to_owned_blob(self): + blob = Mock() + assert _BareAgentKey(name=None, blob=blob).asbytes() is blob + + def defers_to_inner_key_when_present(self, keys): + key = AgentKey(agent=None, blob=keys.pkey_with_cert.asbytes()) + # Artificially make outer key blob != inner key blob; comment in + # AgentKey.asbytes implies this can sometimes really happen but I + # no longer recall when that could be? + key.blob = b"nope" + assert key.asbytes() == key.inner_key.asbytes() + + @mark.parametrize( + "sign_kwargs,expected_flag", + [ + # No algorithm kwarg: no flags (bitfield -> 0 int) + (dict(), 0), + (dict(algorithm="rsa-sha2-256"), SSH_AGENT_RSA_SHA2_256), + (dict(algorithm="rsa-sha2-512"), SSH_AGENT_RSA_SHA2_512), + # TODO: ideally we only send these when key is a cert, + # but it doesn't actually break when not; meh. Really just wants + # all the parameterization of this test rethought. + ( + dict(algorithm="rsa-sha2-256-cert-v01@openssh.com"), + SSH_AGENT_RSA_SHA2_256, + ), + ( + dict(algorithm="rsa-sha2-512-cert-v01@openssh.com"), + SSH_AGENT_RSA_SHA2_512, + ), + ], + ) + def signing_data(self, sign_kwargs, expected_flag): + class FakeAgent: + def _send_message(self, msg): + # The thing we actually care most about, we're not testing + # ssh-agent itself here + self._sent_message = msg + sig = Message() + sig.add_string("lol") + sig.rewind() + return SSH2_AGENT_SIGN_RESPONSE, sig + + for do_cert in (False, True): + agent = FakeAgent() + # Get key kinda like how a real agent would give it to us - if + # cert, it'd be the entire public blob, not just the pubkey. This + # ensures the code under test sends _just the pubkey part_ back to + # the agent during signature requests (bug was us sending _the + # entire cert blob_, which somehow "worked ok" but always got us + # SHA1) + # NOTE: using lower level loader to avoid auto-cert-load when + # testing regular key (agents expose them separately) + inner_key = RSAKey.from_private_key_file(_support("rsa.key")) + blobby = inner_key.asbytes() + # NOTE: expected key blob always wants to be the real key, even + # when the "key" is a certificate. + expected_request_key_blob = blobby + if do_cert: + inner_key.load_certificate(_support("rsa.key-cert.pub")) + blobby = inner_key.public_blob.key_blob + key = AgentKey(agent, blobby) + result = key.sign_ssh_data(b"data-to-sign", **sign_kwargs) + assert result == b"lol" + msg = agent._sent_message + msg.rewind() + assert msg.get_byte() == cSSH2_AGENTC_SIGN_REQUEST + assert msg.get_string() == expected_request_key_blob + assert msg.get_string() == b"data-to-sign" + assert msg.get_int() == expected_flag diff --git a/tests/auth.py b/tests/auth.py new file mode 100644 index 00000000..c0afe889 --- /dev/null +++ b/tests/auth.py @@ -0,0 +1,580 @@ +""" +Tests focusing primarily on the authentication step. + +Thus, they concern AuthHandler and AuthStrategy, with a side of Transport. +""" + +from logging import Logger +from unittest.mock import Mock + +from pytest import raises + +from paramiko import ( + AgentKey, + AuthenticationException, + AuthFailure, + AuthResult, + AuthSource, + AuthStrategy, + BadAuthenticationType, + DSSKey, + InMemoryPrivateKey, + NoneAuth, + OnDiskPrivateKey, + Password, + PrivateKey, + PKey, + RSAKey, + SSHException, + ServiceRequestingTransport, + SourceResult, +) + +from ._util import ( + _disable_sha1_pubkey, + _disable_sha2, + _disable_sha2_pubkey, + _support, + requires_sha1_signing, + server, + unicodey, +) + + +class AuthHandler_: + """ + Most of these tests are explicit about the auth method they call. + + This is because not too many other tests do so (they rely on the implicit + auth trigger of various connect() kwargs). + """ + + def bad_auth_type(self): + """ + verify that we get the right exception when an unsupported auth + type is requested. + """ + # Server won't allow password auth for this user, so should fail + # and return just publickey allowed types + with server( + connect=dict(username="unknown", password="error"), + catch_error=True, + ) as (_, _, err): + assert isinstance(err, BadAuthenticationType) + assert err.allowed_types == ["publickey"] + + def bad_password(self): + """ + verify that a bad password gets the right exception, and that a retry + with the right password works. + """ + # NOTE: Transport.connect doesn't do any auth upfront if no userauth + # related kwargs given. + with server(defer=True) as (tc, ts): + # Auth once, badly + with raises(AuthenticationException): + tc.auth_password(username="slowdive", password="error") + # And again, correctly + tc.auth_password(username="slowdive", password="pygmalion") + + def multipart_auth(self): + """ + verify that multipart auth works. + """ + with server(defer=True) as (tc, ts): + assert tc.auth_password( + username="paranoid", password="paranoid" + ) == ["publickey"] + key = DSSKey.from_private_key_file(_support("dss.key")) + assert tc.auth_publickey(username="paranoid", key=key) == [] + + def interactive_auth(self): + """ + verify keyboard-interactive auth works. + """ + + def handler(title, instructions, prompts): + self.got_title = title + self.got_instructions = instructions + self.got_prompts = prompts + return ["cat"] + + with server(defer=True) as (tc, ts): + assert tc.auth_interactive("commie", handler) == [] + assert self.got_title == "password" + assert self.got_prompts == [("Password", False)] + + def interactive_fallback(self): + """ + verify that a password auth attempt will fallback to "interactive" + if password auth isn't supported but interactive is. + """ + with server(defer=True) as (tc, ts): + # This username results in an allowed_auth of just kbd-int, + # and has a configured interactive->response on the server. + assert tc.auth_password("commie", "cat") == [] + + def utf8(self): + """ + verify that utf-8 encoding happens in authentication. + """ + with server(defer=True) as (tc, ts): + assert tc.auth_password("utf8", unicodey) == [] + + def non_utf8(self): + """ + verify that non-utf-8 encoded passwords can be used for broken + servers. + """ + with server(defer=True) as (tc, ts): + assert tc.auth_password("non-utf8", "\xff") == [] + + def auth_exception_when_disconnected(self): + """ + verify that we catch a server disconnecting during auth, and report + it as an auth failure. + """ + with server(defer=True, skip_verify=True) as (tc, ts), raises( + AuthenticationException + ): + tc.auth_password("bad-server", "hello") + + def non_responsive_triggers_auth_exception(self): + """ + verify that authentication times out if server takes to long to + respond (or never responds). + """ + with server(defer=True, skip_verify=True) as (tc, ts), raises( + AuthenticationException + ) as info: + tc.auth_timeout = 1 # 1 second, to speed up test + tc.auth_password("unresponsive-server", "hello") + assert "Authentication timeout" in str(info.value) + + +class AuthOnlyHandler_: + def _server(self, *args, **kwargs): + kwargs.setdefault("transport_factory", ServiceRequestingTransport) + return server(*args, **kwargs) + + class fallback_pubkey_algorithm: + @requires_sha1_signing + def key_type_algo_selected_when_no_server_sig_algs(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + # Server pretending to be an apparently common setup: + # - doesn't support (or have enabled) sha2 + # - also doesn't support (or have enabled) server-sig-algs/ext-info + # This is the scenario in which Paramiko has to guess-the-algo, and + # where servers that don't support sha2 or server-sig-algs can give + # us trouble. + server_init = dict(_disable_sha2_pubkey, server_sig_algs=False) + with self._server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + server_init=server_init, + catch_error=True, + ) as (tc, ts, err): + # Auth did work + assert tc.is_authenticated() + # Selected ssh-rsa, instead of first-in-the-list (rsa-sha2-512) + assert tc._agreed_pubkey_algorithm == "ssh-rsa" + + @requires_sha1_signing + def key_type_algo_selection_is_cert_suffix_aware(self): + # This key has a cert next to it, which should trigger cert-aware + # loading within key classes. + privkey = PKey.from_path(_support("rsa.key")) + server_init = dict(_disable_sha2_pubkey, server_sig_algs=False) + with self._server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + server_init=server_init, + catch_error=True, + ) as (tc, ts, err): + assert not err + # Auth did work + assert tc.is_authenticated() + # Selected expected cert type + assert ( + tc._agreed_pubkey_algorithm + == "ssh-rsa-cert-v01@openssh.com" + ) + + @requires_sha1_signing + def uses_first_preferred_algo_if_key_type_not_in_list(self): + # This is functionally the same as legacy AuthHandler, just + # arriving at the same place in a different manner. + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + server_init = dict(_disable_sha2_pubkey, server_sig_algs=False) + with self._server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + server_init=server_init, + client_init=_disable_sha1_pubkey, # no ssh-rsa + catch_error=True, + ) as (tc, ts, err): + assert not tc.is_authenticated() + assert isinstance(err, AuthenticationException) + assert tc._agreed_pubkey_algorithm == "rsa-sha2-512" + + +class SHA2SignaturePubkeys: + def pubkey_auth_honors_disabled_algorithms(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + init=dict( + disabled_algorithms=dict( + pubkeys=["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512"] + ) + ), + catch_error=True, + ) as (_, _, err): + assert isinstance(err, SSHException) + assert "no RSA pubkey algorithms" in str(err) + + def client_sha2_disabled_server_sha1_disabled_no_match(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + client_init=_disable_sha2_pubkey, + server_init=_disable_sha1_pubkey, + catch_error=True, + ) as (tc, ts, err): + assert isinstance(err, AuthenticationException) + + def client_sha1_disabled_server_sha2_disabled_no_match(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + client_init=_disable_sha1_pubkey, + server_init=_disable_sha2_pubkey, + catch_error=True, + ) as (tc, ts, err): + assert isinstance(err, AuthenticationException) + + @requires_sha1_signing + def ssh_rsa_still_used_when_sha2_disabled(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + # NOTE: this works because key obj comparison uses public bytes + # TODO: would be nice for PKey to grow a legit "give me another obj of + # same class but just the public bits" using asbytes() + with server( + pubkeys=[privkey], connect=dict(pkey=privkey), init=_disable_sha2 + ) as (tc, _): + assert tc.is_authenticated() + + @requires_sha1_signing + def first_client_preferred_algo_used_when_no_server_sig_algs(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + # Server pretending to be an apparently common setup: + # - doesn't support (or have enabled) sha2 + # - also doesn't support (or have enabled) server-sig-algs/ext-info + # This is the scenario in which Paramiko has to guess-the-algo, and + # where servers that don't support sha2 or server-sig-algs give us + # trouble. + server_init = dict(_disable_sha2_pubkey, server_sig_algs=False) + with server( + pubkeys=[privkey], + connect=dict(username="slowdive", pkey=privkey), + server_init=server_init, + catch_error=True, + ) as (tc, ts, err): + assert not tc.is_authenticated() + assert isinstance(err, AuthenticationException) + # Oh no! this isn't ssh-rsa, and our server doesn't support sha2! + assert tc._agreed_pubkey_algorithm == "rsa-sha2-512" + + def sha2_512(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + init=dict( + disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-256"]) + ), + ) as (tc, ts): + assert tc.is_authenticated() + assert tc._agreed_pubkey_algorithm == "rsa-sha2-512" + + def sha2_256(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + init=dict( + disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"]) + ), + ) as (tc, ts): + assert tc.is_authenticated() + assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" + + def sha2_256_when_client_only_enables_256(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + # Client-side only; server still accepts all 3. + client_init=dict( + disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"]) + ), + ) as (tc, ts): + assert tc.is_authenticated() + assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" + + +class AuthSource_: + class base_class: + def init_requires_and_saves_username(self): + with raises(TypeError): + AuthSource() + assert AuthSource(username="foo").username == "foo" + + def dunder_repr_delegates_to_helper(self): + source = AuthSource("foo") + source._repr = Mock(wraps=lambda: "whatever") + repr(source) + source._repr.assert_called_once_with() + + def repr_helper_prints_basic_kv_pairs(self): + assert repr(AuthSource("foo")) == "AuthSource()" + assert ( + AuthSource("foo")._repr(bar="open") == "AuthSource(bar='open')" + ) + + def authenticate_takes_transport_and_is_abstract(self): + # TODO: this test kinda just goes away once we're typed? + with raises(TypeError): + AuthSource("foo").authenticate() + with raises(NotImplementedError): + AuthSource("foo").authenticate(None) + + class NoneAuth_: + def authenticate_auths_none(self): + trans = Mock() + result = NoneAuth("foo").authenticate(trans) + trans.auth_none.assert_called_once_with("foo") + assert result is trans.auth_none.return_value + + def repr_shows_class(self): + assert repr(NoneAuth("foo")) == "NoneAuth()" + + class Password_: + def init_takes_and_stores_password_getter(self): + with raises(TypeError): + Password("foo") + getter = Mock() + pw = Password("foo", password_getter=getter) + assert pw.password_getter is getter + + def repr_adds_username(self): + pw = Password("foo", password_getter=Mock()) + assert repr(pw) == "Password(user='foo')" + + def authenticate_gets_and_supplies_password(self): + getter = Mock(return_value="bar") + trans = Mock() + pw = Password("foo", password_getter=getter) + result = pw.authenticate(trans) + trans.auth_password.assert_called_once_with("foo", "bar") + assert result is trans.auth_password.return_value + + class PrivateKey_: + def authenticate_calls_publickey_with_pkey(self): + source = PrivateKey(username="foo") + source.pkey = Mock() # set by subclasses + trans = Mock() + result = source.authenticate(trans) + trans.auth_publickey.assert_called_once_with("foo", source.pkey) + assert result is trans.auth_publickey.return_value + + class InMemoryPrivateKey_: + def init_takes_pkey_object(self): + with raises(TypeError): + InMemoryPrivateKey("foo") + pkey = Mock() + source = InMemoryPrivateKey(username="foo", pkey=pkey) + assert source.pkey is pkey + + def repr_shows_pkey_repr(self): + pkey = PKey.from_path(_support("ed25519.key")) + source = InMemoryPrivateKey("foo", pkey) + assert ( + repr(source) + == "InMemoryPrivateKey(pkey=PKey(alg=ED25519, bits=256, fp=SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow))" # noqa + ) + + def repr_appends_agent_flag_when_AgentKey(self): + real_key = PKey.from_path(_support("ed25519.key")) + pkey = AgentKey(agent=None, blob=bytes(real_key)) + source = InMemoryPrivateKey("foo", pkey) + assert ( + repr(source) + == "InMemoryPrivateKey(pkey=PKey(alg=ED25519, bits=256, fp=SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow)) [agent]" # noqa + ) + + class OnDiskPrivateKey_: + def init_takes_source_path_and_pkey(self): + with raises(TypeError): + OnDiskPrivateKey("foo") + with raises(TypeError): + OnDiskPrivateKey("foo", "bar") + with raises(TypeError): + OnDiskPrivateKey("foo", "bar", "biz") + source = OnDiskPrivateKey( + username="foo", + source="ssh-config", + path="of-exile", + pkey="notreally", + ) + assert source.username == "foo" + assert source.source == "ssh-config" + assert source.path == "of-exile" + assert source.pkey == "notreally" + + def init_requires_specific_value_for_source(self): + with raises( + ValueError, + match=r"source argument must be one of: \('ssh-config', 'python-config', 'implicit-home'\)", # noqa + ): + OnDiskPrivateKey("foo", source="what?", path="meh", pkey="no") + + def repr_reflects_source_path_and_pkey(self): + source = OnDiskPrivateKey( + username="foo", + source="ssh-config", + path="of-exile", + pkey="notreally", + ) + assert ( + repr(source) + == "OnDiskPrivateKey(key='notreally', source='ssh-config', path='of-exile')" # noqa + ) + + +class AuthResult_: + def setup_method(self): + self.strat = AuthStrategy(None) + + def acts_like_list_with_strategy_attribute(self): + with raises(TypeError): + AuthResult() + # kwarg works by itself + AuthResult(strategy=self.strat) + # or can be given as posarg w/ regular list() args after + result = AuthResult(self.strat, [1, 2, 3]) + assert result.strategy is self.strat + assert result == [1, 2, 3] + assert isinstance(result, list) + + def repr_is_list_repr_untouched(self): + result = AuthResult(self.strat, [1, 2, 3]) + assert repr(result) == "[1, 2, 3]" + + class dunder_str: + def is_multiline_display_of_sourceresult_tuples(self): + result = AuthResult(self.strat) + result.append(SourceResult("foo", "bar")) + result.append(SourceResult("biz", "baz")) + assert str(result) == "foo -> bar\nbiz -> baz" + + def shows_str_not_repr_of_auth_source_and_result(self): + result = AuthResult(self.strat) + result.append( + SourceResult(NoneAuth("foo"), ["password", "pubkey"]) + ) + assert str(result) == "NoneAuth() -> ['password', 'pubkey']" + + def empty_list_result_values_show_success_string(self): + result = AuthResult(self.strat) + result.append(SourceResult(NoneAuth("foo"), [])) + assert str(result) == "NoneAuth() -> success" + + +class AuthFailure_: + def is_an_AuthenticationException(self): + assert isinstance(AuthFailure(None), AuthenticationException) + + def init_requires_result(self): + with raises(TypeError): + AuthFailure() + result = AuthResult(None) + fail = AuthFailure(result=result) + assert fail.result is result + + def str_is_newline_plus_result_str(self): + result = AuthResult(None) + result.append(SourceResult(NoneAuth("foo"), Exception("onoz"))) + fail = AuthFailure(result) + assert str(fail) == "\nNoneAuth() -> onoz" + + +class AuthStrategy_: + def init_requires_ssh_config_param_and_sets_up_a_logger(self): + with raises(TypeError): + AuthStrategy() + conf = object() + strat = AuthStrategy(ssh_config=conf) + assert strat.ssh_config is conf + assert isinstance(strat.log, Logger) + assert strat.log.name == "paramiko.auth_strategy" + + def get_sources_is_abstract(self): + with raises(NotImplementedError): + AuthStrategy(None).get_sources() + + class authenticate: + def setup_method(self): + self.strat = AuthStrategy(None) # ssh_config not used directly + self.source, self.transport = NoneAuth(None), Mock() + self.source.authenticate = Mock() + self.strat.get_sources = Mock(return_value=[self.source]) + + def requires_and_uses_transport_with_methods_returning_result(self): + with raises(TypeError): + self.strat.authenticate() + result = self.strat.authenticate(self.transport) + self.strat.get_sources.assert_called_once_with() + self.source.authenticate.assert_called_once_with(self.transport) + assert isinstance(result, AuthResult) + assert result.strategy is self.strat + assert len(result) == 1 + source_res = result[0] + assert isinstance(source_res, SourceResult) + assert source_res.source is self.source + assert source_res.result is self.source.authenticate.return_value + + def logs_sources_attempted(self): + self.strat.log = Mock() + self.strat.authenticate(self.transport) + self.strat.log.debug.assert_called_once_with("Trying NoneAuth()") + + def raises_AuthFailure_if_no_successes(self): + self.strat.log = Mock() + oops = Exception("onoz") + self.source.authenticate.side_effect = oops + with raises(AuthFailure) as info: + self.strat.authenticate(self.transport) + result = info.value.result + assert isinstance(result, AuthResult) + assert len(result) == 1 + source_res = result[0] + assert isinstance(source_res, SourceResult) + assert source_res.source is self.source + assert source_res.result is oops + self.strat.log.info.assert_called_once_with( + "Authentication via NoneAuth() failed with Exception" + ) + + def short_circuits_on_successful_auth(self): + kaboom = Mock(authenticate=Mock(side_effect=Exception("onoz"))) + self.strat.get_sources.return_value = [self.source, kaboom] + result = self.strat.authenticate(self.transport) + # No exception, and it's just a regular ol Result + assert isinstance(result, AuthResult) + # And it did not capture any attempt to execute the 2nd source + assert len(result) == 1 + assert result[0].source is self.source diff --git a/tests/conftest.py b/tests/conftest.py index b28d2a17..12b97283 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,17 +2,30 @@ import logging import os import shutil import threading +from pathlib import Path + +from invoke.vendor.lexicon import Lexicon import pytest -from paramiko import RSAKey, SFTPServer, SFTP, Transport +from paramiko import ( + SFTPServer, + SFTP, + Transport, + DSSKey, + RSAKey, + Ed25519Key, + ECDSAKey, + PKey, +) -from .loop import LoopSocket -from .stub_sftp import StubServer, StubSFTPServer -from .util import _support +from ._loop import LoopSocket +from ._stub_sftp import StubServer, StubSFTPServer +from ._util import _support from icecream import ic, install as install_ic +# Better print() for debugging - use ic()! install_ic() ic.configureOutput(includeContext=True) @@ -68,10 +81,11 @@ def sftp_server(): socks = LoopSocket() sockc = LoopSocket() sockc.link(socks) + # TODO: reuse with new server fixture if possible tc = Transport(sockc) ts = Transport(socks) # Auth - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) ts.add_server_key(host_key) # Server setup event = threading.Event() @@ -103,3 +117,54 @@ def sftp(sftp_server): yield client # Clean up - as in make_sftp_folder, we assume local-only exec for now. shutil.rmtree(client.FOLDER, ignore_errors=True) + + +key_data = [ + ["ssh-rsa", RSAKey, "SHA256:OhNL391d/beeFnxxg18AwWVYTAHww+D4djEE7Co0Yng"], + ["ssh-dss", DSSKey, "SHA256:uHwwykG099f4M4kfzvFpKCTino0/P03DRbAidpAmPm0"], + [ + "ssh-ed25519", + Ed25519Key, + "SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow", + ], + [ + "ecdsa-sha2-nistp256", + ECDSAKey, + "SHA256:BrQG04oNKUETjKCeL4ifkARASg3yxS/pUHl3wWM26Yg", + ], +] +for datum in key_data: + # Add true first member with human-facing short algo name + short = datum[0].replace("ssh-", "").replace("sha2-nistp", "") + datum.insert(0, short) + + +@pytest.fixture(scope="session", params=key_data, ids=lambda x: x[0]) +def keys(request): + """ + Yield an object for each known type of key, with attributes: + + - ``short_type``: short identifier, eg ``rsa`` or ``ecdsa-256`` + - ``full_type``: the "message style" key identifier, eg ``ssh-rsa``, or + ``ecdsa-sha2-nistp256``. + - ``path``: a pathlib Path object to the fixture key file + - ``pkey``: PKey object, which may or may not also have a cert loaded + - ``expected_fp``: the expected fingerprint of said key + """ + short_type, key_type, key_class, fingerprint = request.param + bag = Lexicon() + bag.short_type = short_type + bag.full_type = key_type + bag.path = Path(_support(f"{short_type}.key")) + with bag.path.open() as fd: + bag.pkey = key_class.from_private_key(fd) + # Second copy for things like equality-but-not-identity testing + with bag.path.open() as fd: + bag.pkey2 = key_class.from_private_key(fd) + bag.expected_fp = fingerprint + # Also tack on the cert-bearing variant for some tests + cert = bag.path.with_suffix(".key-cert.pub") + bag.pkey_with_cert = PKey.from_path(cert) if cert.exists() else None + # Safety checks + assert bag.pkey.fingerprint == fingerprint + yield bag diff --git a/tests/pkey.py b/tests/pkey.py new file mode 100644 index 00000000..691fda0f --- /dev/null +++ b/tests/pkey.py @@ -0,0 +1,229 @@ +from pathlib import Path +from unittest.mock import patch, call + +from pytest import raises + +from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey +from paramiko import ( + DSSKey, + ECDSAKey, + Ed25519Key, + Message, + PKey, + PublicBlob, + RSAKey, + UnknownKeyType, +) + +from ._util import _support + + +class PKey_: + # NOTE: this is incidentally tested by a number of other tests, such as the + # agent.py test suite + class from_type_string: + def loads_from_type_and_bytes(self, keys): + obj = PKey.from_type_string(keys.full_type, keys.pkey.asbytes()) + assert obj == keys.pkey + + # TODO: exceptions + # + # TODO: passphrase? OTOH since this is aimed at the agent...irrelephant + + class from_path: + def loads_from_Path(self, keys): + obj = PKey.from_path(keys.path) + assert obj == keys.pkey + + def loads_from_str(self): + key = PKey.from_path(str(_support("rsa.key"))) + assert isinstance(key, RSAKey) + + @patch("paramiko.pkey.Path") + def expands_user(self, mPath): + # real key for guts that want a real key format + mykey = Path(_support("rsa.key")) + pathy = mPath.return_value.expanduser.return_value + # read_bytes for cryptography.io's loaders + pathy.read_bytes.return_value = mykey.read_bytes() + # open() for our own class loader + pathy.open.return_value = mykey.open() + # fake out exists() to avoid attempts to load cert + pathy.exists.return_value = False + PKey.from_path("whatever") # we're not testing expanduser itself + # Both key and cert paths + mPath.return_value.expanduser.assert_has_calls([call(), call()]) + + def raises_UnknownKeyType_for_unknown_types(self): + # I.e. a real, becomes a useful object via cryptography.io, key + # class that we do NOT support. Chose Ed448 randomly as OpenSSH + # doesn't seem to support it either, going by ssh-keygen... + keypath = _support("ed448.key") + with raises(UnknownKeyType) as exc: + PKey.from_path(keypath) + assert issubclass(exc.value.key_type, Ed448PrivateKey) + with open(keypath, "rb") as fd: + assert exc.value.key_bytes == fd.read() + + def leaves_cryptography_exceptions_untouched(self): + # a Python file is not a private key! + with raises(ValueError): + PKey.from_path(__file__) + + # TODO: passphrase support tested + + class automatically_loads_certificates: + def existing_cert_loaded_when_given_key_path(self): + key = PKey.from_path(_support("rsa.key")) + # Public blob exists despite no .load_certificate call + assert key.public_blob is not None + assert ( + key.public_blob.key_type == "ssh-rsa-cert-v01@openssh.com" + ) + # And it's definitely the one we expected + assert key.public_blob == PublicBlob.from_file( + _support("rsa.key-cert.pub") + ) + + def can_be_given_cert_path_instead(self): + key = PKey.from_path(_support("rsa.key-cert.pub")) + # It's still a key, not a PublicBlob + assert isinstance(key, RSAKey) + # Public blob exists despite no .load_certificate call + assert key.public_blob is not None + assert ( + key.public_blob.key_type == "ssh-rsa-cert-v01@openssh.com" + ) + # And it's definitely the one we expected + assert key.public_blob == PublicBlob.from_file( + _support("rsa.key-cert.pub") + ) + + def no_cert_load_if_no_cert(self): + # This key exists (it's a copy of the regular one) but has no + # matching -cert.pub + key = PKey.from_path(_support("rsa-lonely.key")) + assert key.public_blob is None + + def excepts_usefully_if_no_key_only_cert(self): + # TODO: is that truly an error condition? the cert is ~the + # pubkey and we still require the privkey for signing, yea? + # This cert exists (it's a copy of the regular one) but there's + # no rsa-missing.key to load. + with raises(FileNotFoundError) as info: + PKey.from_path(_support("rsa-missing.key-cert.pub")) + assert info.value.filename.endswith("rsa-missing.key") + + class load_certificate: + def rsa_public_cert_blobs(self): + # Data to test signing with (arbitrary) + data = b"ice weasels" + # Load key w/o cert at first (so avoiding .from_path) + key = RSAKey.from_private_key_file(_support("rsa.key")) + assert key.public_blob is None + # Sign regular-style (using, arbitrarily, SHA2) + msg = key.sign_ssh_data(data, "rsa-sha2-256") + msg.rewind() + assert "rsa-sha2-256" == msg.get_text() + signed = msg.get_binary() # for comparison later + + # Load cert and inspect its internals + key.load_certificate(_support("rsa.key-cert.pub")) + assert key.public_blob is not None + assert key.public_blob.key_type == "ssh-rsa-cert-v01@openssh.com" + assert key.public_blob.comment == "test_rsa.key.pub" + msg = Message(key.public_blob.key_blob) + # cert type + assert msg.get_text() == "ssh-rsa-cert-v01@openssh.com" + # nonce + msg.get_string() + # public numbers + assert msg.get_mpint() == key.public_numbers.e + assert msg.get_mpint() == key.public_numbers.n + # serial number + assert msg.get_int64() == 1234 + # TODO: whoever wrote the OG tests didn't care about the remaining + # fields from + # https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys + # so neither do I, for now... + + # Sign cert-style (still SHA256 - so this actually does almost + # exactly the same thing under the hood as the previous sign) + msg = key.sign_ssh_data(data, "rsa-sha2-256-cert-v01@openssh.com") + msg.rewind() + assert "rsa-sha2-256" == msg.get_text() + assert signed == msg.get_binary() # same signature as above + msg.rewind() + assert key.verify_ssh_sig(b"ice weasels", msg) # our data verified + + def loading_cert_of_different_type_from_key_raises_ValueError(self): + edkey = Ed25519Key.from_private_key_file(_support("ed25519.key")) + err = "PublicBlob type ssh-rsa-cert-v01@openssh.com incompatible with key type ssh-ed25519" # noqa + with raises(ValueError, match=err): + edkey.load_certificate(_support("rsa.key-cert.pub")) + + def fingerprint(self, keys): + # NOTE: Hardcoded fingerprint expectation stored in fixture. + assert keys.pkey.fingerprint == keys.expected_fp + + def algorithm_name(self, keys): + key = keys.pkey + if isinstance(key, RSAKey): + assert key.algorithm_name == "RSA" + elif isinstance(key, DSSKey): + assert key.algorithm_name == "DSS" + elif isinstance(key, ECDSAKey): + assert key.algorithm_name == "ECDSA" + elif isinstance(key, Ed25519Key): + assert key.algorithm_name == "ED25519" + # TODO: corner case: AgentKey, whose .name can be cert-y (due to the + # value of the name field passed via agent protocol) and thus + # algorithm_name is eg "RSA-CERT" - keys loaded directly from disk will + # never look this way, even if they have a .public_blob attached. + + class equality_and_hashing: + def same_key_is_equal_to_itself(self, keys): + assert keys.pkey == keys.pkey2 + + def same_key_same_hash(self, keys): + # NOTE: this isn't a great test due to hashseed randomization under + # Python 3 preventing use of static values, but it does still prove + # that __hash__ is implemented/doesn't explode & works across + # instances + assert hash(keys.pkey) == hash(keys.pkey2) + + def keys_are_not_equal_to_other_types(self, keys): + for value in [None, True, ""]: + assert keys.pkey != value + + class identifiers_classmethods: + def default_is_class_name_attribute(self): + # NOTE: not all classes _have_ this, only the ones that don't + # customize identifiers(). + class MyKey(PKey): + name = "it me" + + assert MyKey.identifiers() == ["it me"] + + def rsa_is_all_combos_of_cert_and_sha_type(self): + assert RSAKey.identifiers() == [ + "ssh-rsa", + "ssh-rsa-cert-v01@openssh.com", + "rsa-sha2-256", + "rsa-sha2-256-cert-v01@openssh.com", + "rsa-sha2-512", + "rsa-sha2-512-cert-v01@openssh.com", + ] + + def dss_is_protocol_name(self): + assert DSSKey.identifiers() == ["ssh-dss"] + + def ed25519_is_protocol_name(self): + assert Ed25519Key.identifiers() == ["ssh-ed25519"] + + def ecdsa_is_all_curve_names(self): + assert ECDSAKey.identifiers() == [ + "ecdsa-sha2-nistp256", + "ecdsa-sha2-nistp384", + "ecdsa-sha2-nistp521", + ] diff --git a/tests/test_agent.py b/tests/test_agent.py deleted file mode 100644 index 01602d0e..00000000 --- a/tests/test_agent.py +++ /dev/null @@ -1,50 +0,0 @@ -import unittest - -from paramiko.message import Message -from paramiko.agent import ( - SSH2_AGENT_SIGN_RESPONSE, - cSSH2_AGENTC_SIGN_REQUEST, - SSH_AGENT_RSA_SHA2_256, - SSH_AGENT_RSA_SHA2_512, - AgentKey, -) -from paramiko.util import b - - -class ChaosAgent: - def _send_message(self, msg): - self._sent_message = msg - sig = Message() - sig.add_string(b("lol")) - sig.rewind() - return SSH2_AGENT_SIGN_RESPONSE, sig - - -class AgentTests(unittest.TestCase): - def _sign_with_agent(self, kwargs, expectation): - agent = ChaosAgent() - key = AgentKey(agent, b("secret!!!")) - result = key.sign_ssh_data(b("token"), **kwargs) - assert result == b("lol") - msg = agent._sent_message - msg.rewind() - assert msg.get_byte() == cSSH2_AGENTC_SIGN_REQUEST - assert msg.get_string() == b("secret!!!") - assert msg.get_string() == b("token") - assert msg.get_int() == expectation - - def test_agent_signing_defaults_to_0_for_flags_field(self): - # No algorithm kwarg at all - self._sign_with_agent(kwargs=dict(), expectation=0) - - def test_agent_signing_is_2_for_SHA256(self): - self._sign_with_agent( - kwargs=dict(algorithm="rsa-sha2-256"), - expectation=SSH_AGENT_RSA_SHA2_256, - ) - - def test_agent_signing_is_2_for_SHA512(self): - self._sign_with_agent( - kwargs=dict(algorithm="rsa-sha2-512"), - expectation=SSH_AGENT_RSA_SHA2_512, - ) diff --git a/tests/test_auth.py b/tests/test_auth.py deleted file mode 100644 index 4a960deb..00000000 --- a/tests/test_auth.py +++ /dev/null @@ -1,272 +0,0 @@ -# Copyright (C) 2008 Robey Pointer <robeypointer@gmail.com> -# -# This file is part of paramiko. -# -# Paramiko is free software; you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation; either version 2.1 of the License, or (at your option) -# any later version. -# -# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY -# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR -# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -""" -Some unit tests for authenticating over a Transport. -""" - -import sys -import threading -import unittest -from time import sleep - -from paramiko import ( - Transport, - ServerInterface, - RSAKey, - DSSKey, - BadAuthenticationType, - InteractiveQuery, - AuthenticationException, -) -from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL -from paramiko.util import u - -from .loop import LoopSocket -from .util import _support, slow - - -_pwd = u("\u2022") - - -class NullServer(ServerInterface): - paranoid_did_password = False - paranoid_did_public_key = False - paranoid_key = DSSKey.from_private_key_file(_support("test_dss.key")) - - def get_allowed_auths(self, username): - if username == "slowdive": - return "publickey,password" - if username == "paranoid": - if ( - not self.paranoid_did_password - and not self.paranoid_did_public_key - ): - return "publickey,password" - elif self.paranoid_did_password: - return "publickey" - else: - return "password" - if username == "commie": - return "keyboard-interactive" - if username == "utf8": - return "password" - if username == "non-utf8": - return "password" - return "publickey" - - def check_auth_password(self, username, password): - if (username == "slowdive") and (password == "pygmalion"): - return AUTH_SUCCESSFUL - if (username == "paranoid") and (password == "paranoid"): - # 2-part auth (even openssh doesn't support this) - self.paranoid_did_password = True - if self.paranoid_did_public_key: - return AUTH_SUCCESSFUL - return AUTH_PARTIALLY_SUCCESSFUL - if (username == "utf8") and (password == _pwd): - return AUTH_SUCCESSFUL - if (username == "non-utf8") and (password == "\xff"): - return AUTH_SUCCESSFUL - if username == "bad-server": - raise Exception("Ack!") - if username == "unresponsive-server": - sleep(5) - return AUTH_SUCCESSFUL - return AUTH_FAILED - - def check_auth_publickey(self, username, key): - if (username == "paranoid") and (key == self.paranoid_key): - # 2-part auth - self.paranoid_did_public_key = True - if self.paranoid_did_password: - return AUTH_SUCCESSFUL - return AUTH_PARTIALLY_SUCCESSFUL - return AUTH_FAILED - - def check_auth_interactive(self, username, submethods): - if username == "commie": - self.username = username - return InteractiveQuery( - "password", "Please enter a password.", ("Password", False) - ) - return AUTH_FAILED - - def check_auth_interactive_response(self, responses): - if self.username == "commie": - if (len(responses) == 1) and (responses[0] == "cat"): - return AUTH_SUCCESSFUL - return AUTH_FAILED - - -class AuthTest(unittest.TestCase): - def setUp(self): - self.socks = LoopSocket() - self.sockc = LoopSocket() - self.sockc.link(self.socks) - self.tc = Transport(self.sockc) - self.ts = Transport(self.socks) - - def tearDown(self): - self.tc.close() - self.ts.close() - self.socks.close() - self.sockc.close() - - def start_server(self): - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) - self.public_host_key = RSAKey(data=host_key.asbytes()) - self.ts.add_server_key(host_key) - self.event = threading.Event() - self.server = NullServer() - self.assertTrue(not self.event.is_set()) - self.ts.start_server(self.event, self.server) - - def verify_finished(self): - self.event.wait(1.0) - self.assertTrue(self.event.is_set()) - self.assertTrue(self.ts.is_active()) - - def test_bad_auth_type(self): - """ - verify that we get the right exception when an unsupported auth - type is requested. - """ - self.start_server() - try: - self.tc.connect( - hostkey=self.public_host_key, - username="unknown", - password="error", - ) - self.assertTrue(False) - except: - etype, evalue, etb = sys.exc_info() - self.assertEqual(BadAuthenticationType, etype) - self.assertEqual(["publickey"], evalue.allowed_types) - - def test_bad_password(self): - """ - verify that a bad password gets the right exception, and that a retry - with the right password works. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - try: - self.tc.auth_password(username="slowdive", password="error") - self.assertTrue(False) - except: - etype, evalue, etb = sys.exc_info() - self.assertTrue(issubclass(etype, AuthenticationException)) - self.tc.auth_password(username="slowdive", password="pygmalion") - self.verify_finished() - - def test_multipart_auth(self): - """ - verify that multipart auth works. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - remain = self.tc.auth_password( - username="paranoid", password="paranoid" - ) - self.assertEqual(["publickey"], remain) - key = DSSKey.from_private_key_file(_support("test_dss.key")) - remain = self.tc.auth_publickey(username="paranoid", key=key) - self.assertEqual([], remain) - self.verify_finished() - - def test_interactive_auth(self): - """ - verify keyboard-interactive auth works. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - - def handler(title, instructions, prompts): - self.got_title = title - self.got_instructions = instructions - self.got_prompts = prompts - return ["cat"] - - remain = self.tc.auth_interactive("commie", handler) - self.assertEqual(self.got_title, "password") - self.assertEqual(self.got_prompts, [("Password", False)]) - self.assertEqual([], remain) - self.verify_finished() - - def test_interactive_auth_fallback(self): - """ - verify that a password auth attempt will fallback to "interactive" - if password auth isn't supported but interactive is. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - remain = self.tc.auth_password("commie", "cat") - self.assertEqual([], remain) - self.verify_finished() - - def test_auth_utf8(self): - """ - verify that utf-8 encoding happens in authentication. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - remain = self.tc.auth_password("utf8", _pwd) - self.assertEqual([], remain) - self.verify_finished() - - def test_auth_non_utf8(self): - """ - verify that non-utf-8 encoded passwords can be used for broken - servers. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - remain = self.tc.auth_password("non-utf8", "\xff") - self.assertEqual([], remain) - self.verify_finished() - - def test_auth_gets_disconnected(self): - """ - verify that we catch a server disconnecting during auth, and report - it as an auth failure. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - try: - self.tc.auth_password("bad-server", "hello") - except: - etype, evalue, etb = sys.exc_info() - self.assertTrue(issubclass(etype, AuthenticationException)) - - @slow - def test_auth_non_responsive(self): - """ - verify that authentication times out if server takes to long to - respond (or never responds). - """ - self.tc.auth_timeout = 1 # 1 second, to speed up test - self.start_server() - self.tc.connect() - try: - self.tc.auth_password("unresponsive-server", "hello") - except: - etype, evalue, etb = sys.exc_info() - self.assertTrue(issubclass(etype, AuthenticationException)) - self.assertTrue("Authentication timeout" in str(evalue)) diff --git a/tests/test_client.py b/tests/test_client.py index 21ecd72a..1c0c6c84 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -41,7 +41,7 @@ from paramiko import SSHClient from paramiko.pkey import PublicBlob from paramiko.ssh_exception import SSHException, AuthenticationException -from .util import _support, requires_sha1_signing, slow +from ._util import _support, requires_sha1_signing, slow requires_gss_auth = unittest.skipUnless( @@ -171,10 +171,10 @@ class ClientTest(unittest.TestCase): self.ts = paramiko.Transport(self.socks) if server_name is not None: self.ts.local_version = server_name - keypath = _support("test_rsa.key") + keypath = _support("rsa.key") host_key = paramiko.RSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) - keypath = _support("test_ecdsa_256.key") + keypath = _support("ecdsa-256.key") host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob) @@ -194,9 +194,7 @@ class ClientTest(unittest.TestCase): run_kwargs[key] = kwargs.pop(key, None) # Server setup threading.Thread(target=self._run, kwargs=run_kwargs).start() - host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") - ) + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) # Client setup @@ -256,25 +254,25 @@ class SSHClientTest(ClientTest): """ verify that SSHClient works with a DSA key. """ - self._test_connection(key_filename=_support("test_dss.key")) + self._test_connection(key_filename=_support("dss.key")) @requires_sha1_signing def test_client_rsa(self): """ verify that SSHClient works with an RSA key. """ - self._test_connection(key_filename=_support("test_rsa.key")) + self._test_connection(key_filename=_support("rsa.key")) @requires_sha1_signing def test_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ - self._test_connection(key_filename=_support("test_ecdsa_256.key")) + self._test_connection(key_filename=_support("ecdsa-256.key")) @requires_sha1_signing def test_client_ed25519(self): - self._test_connection(key_filename=_support("test_ed25519.key")) + self._test_connection(key_filename=_support("ed25519.key")) @requires_sha1_signing def test_multiple_key_files(self): @@ -289,16 +287,17 @@ class SSHClientTest(ClientTest): } # Various combos of attempted & valid keys # TODO: try every possible combo using itertools functions + # TODO: use new key(s) fixture(s) for attempt, accept in ( (["rsa", "dss"], ["dss"]), # Original test #3 (["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly - (["dss", "rsa", "ecdsa_256"], ["dss"]), # Try ECDSA but fail - (["rsa", "ecdsa_256"], ["ecdsa"]), # ECDSA success + (["dss", "rsa", "ecdsa-256"], ["dss"]), # Try ECDSA but fail + (["rsa", "ecdsa-256"], ["ecdsa"]), # ECDSA success ): try: self._test_connection( key_filename=[ - _support("test_{}.key".format(x)) for x in attempt + _support("{}.key".format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -318,7 +317,7 @@ class SSHClientTest(ClientTest): self.assertRaises( SSHException, self._test_connection, - key_filename=[_support("test_rsa.key")], + key_filename=[_support("rsa.key")], allowed_keys=["ecdsa-sha2-nistp256"], ) @@ -328,30 +327,26 @@ class SSHClientTest(ClientTest): # They're similar except for which path is given; the expected auth and # server-side behavior is 100% identical.) # NOTE: only bothered whipping up one cert per overall class/family. - for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): - cert_name = "test_{}.key-cert.pub".format(type_) - cert_path = _support(os.path.join("cert_support", cert_name)) + for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"): + key_path = _support(f"{type_}.key") self._test_connection( - key_filename=cert_path, - public_blob=PublicBlob.from_file(cert_path), + key_filename=key_path, + public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"), ) @requires_sha1_signing def test_certs_implicitly_loaded_alongside_key_filename_keys(self): - # NOTE: a regular test_connection() w/ test_rsa.key would incidentally + # NOTE: a regular test_connection() w/ rsa.key would incidentally # test this (because test_xxx.key-cert.pub exists) but incidental tests # stink, so NullServer and friends were updated to allow assertions # about the server-side key object's public blob. Thus, we can prove # that a specific cert was found, along with regular authorization # succeeding proving that the overall flow works. - for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): - key_name = "test_{}.key".format(type_) - key_path = _support(os.path.join("cert_support", key_name)) + for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"): + key_path = _support(f"{type_}.key") self._test_connection( key_filename=key_path, - public_blob=PublicBlob.from_file( - "{}-cert.pub".format(key_path) - ), + public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"), ) def _cert_algo_test(self, ver, alg): @@ -360,9 +355,7 @@ class SSHClientTest(ClientTest): self._test_connection( # NOTE: SSHClient is able to take either the key or the cert & will # set up its internals as needed - key_filename=_support( - os.path.join("cert_support", "test_rsa.key-cert.pub") - ), + key_filename=_support("rsa.key-cert.pub"), server_name="SSH-2.0-OpenSSH_{}".format(ver), ) assert ( @@ -391,7 +384,7 @@ class SSHClientTest(ClientTest): """ threading.Thread(target=self._run).start() hostname = f"[{self.addr}]:{self.port}" - key_file = _support("test_ecdsa_256.key") + key_file = _support("ecdsa-256.key") public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = SSHClient() @@ -414,9 +407,7 @@ class SSHClientTest(ClientTest): """ warnings.filterwarnings("ignore", "tempnam.*") - host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") - ) + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) fd, localname = mkstemp() os.close(fd) @@ -516,9 +507,7 @@ class SSHClientTest(ClientTest): """ # Start the thread with a 1 second wait. threading.Thread(target=self._run, kwargs={"delay": 1}).start() - host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") - ) + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = SSHClient() @@ -593,7 +582,7 @@ class SSHClientTest(ClientTest): """ Failed gssapi-keyex doesn't prevent subsequent key from succeeding """ - kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")]) + kwargs = dict(gss_kex=True, key_filename=[_support("rsa.key")]) self._test_connection(**kwargs) @requires_gss_auth @@ -601,7 +590,7 @@ class SSHClientTest(ClientTest): """ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding """ - kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")]) + kwargs = dict(gss_auth=True, key_filename=[_support("rsa.key")]) self._test_connection(**kwargs) def test_reject_policy(self): @@ -683,11 +672,11 @@ class SSHClientTest(ClientTest): self._client_host_key_bad(host_key) def test_host_key_negotiation_3(self): - self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key") + self._client_host_key_good(paramiko.ECDSAKey, "ecdsa-256.key") @requires_sha1_signing def test_host_key_negotiation_4(self): - self._client_host_key_good(paramiko.RSAKey, "test_rsa.key") + self._client_host_key_good(paramiko.RSAKey, "rsa.key") def _setup_for_env(self): threading.Thread(target=self._run).start() diff --git a/tests/test_config.py b/tests/test_config.py index a2c60a32..fcb120b6 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -19,7 +19,7 @@ from paramiko import ( ConfigParseError, ) -from .util import _config +from ._util import _config @fixture diff --git a/tests/test_dss.key b/tests/test_dss.key deleted file mode 100644 index e10807f1..00000000 --- a/tests/test_dss.key +++ /dev/null @@ -1,12 +0,0 @@ ------BEGIN DSA PRIVATE KEY----- -MIIBuwIBAAKBgQDngaYDZ30c6/7cJgEEbtl8FgKdwhba1Z7oOrOn4MI/6C42G1bY -wMuqZf4dBCglsdq39SHrcjbE8Vq54gPSOh3g4+uV9Rcg5IOoPLbwp2jQfF6f1FIb -sx7hrDCIqUcQccPSxetPBKmXI9RN8rZLaFuQeTnI65BKM98Ruwvq6SI2LwIVAPDP -hSeawaJI27mKqOfe5PPBSmyHAoGBAJMXxXmPD9sGaQ419DIpmZecJKBUAy9uXD8x -gbgeDpwfDaFJP8owByCKREocPFfi86LjCuQkyUKOfjYMN6iHIf1oEZjB8uJAatUr -FzI0ArXtUqOhwTLwTyFuUojE5own2WYsOAGByvgfyWjsGhvckYNhI4ODpNdPlxQ8 -ZamaPGPsAoGARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmn -jO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacI -BlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgECFGI9QPSc -h9pT9XHqn+1rZ4bK+QGA ------END DSA PRIVATE KEY----- diff --git a/tests/test_ecdsa_256.key b/tests/test_ecdsa_256.key deleted file mode 100644 index 42d44734..00000000 --- a/tests/test_ecdsa_256.key +++ /dev/null @@ -1,5 +0,0 @@ ------BEGIN EC PRIVATE KEY----- -MHcCAQEEIKB6ty3yVyKEnfF/zprx0qwC76MsMlHY4HXCnqho2eKioAoGCCqGSM49 -AwEHoUQDQgAElI9mbdlaS+T9nHxY/59lFnn80EEecZDBHq4gLpccY8Mge5ZTMiMD -ADRvOqQ5R98Sxst765CAqXmRtz8vwoD96g== ------END EC PRIVATE KEY----- diff --git a/tests/test_ed25519.key b/tests/test_ed25519.key deleted file mode 100644 index eb9f94c2..00000000 --- a/tests/test_ed25519.key +++ /dev/null @@ -1,8 +0,0 @@ ------BEGIN OPENSSH PRIVATE KEY----- -b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW -QyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXwAAAKhjwAdrY8AH -awAAAAtzc2gtZWQyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXw -AAAEA9tGQi2IrprbOSbDCF+RmAHd6meNSXBUQ2ekKXm4/8xnr1K9komH/1WBIvQbbtvnFV -hryd62EfcgRFuLRiokNfAAAAI2FsZXhfZ2F5bm9yQEFsZXhzLU1hY0Jvb2stQWlyLmxvY2 -FsAQI= ------END OPENSSH PRIVATE KEY----- diff --git a/tests/test_file.py b/tests/test_file.py index 456c0388..9344495b 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -26,7 +26,7 @@ from io import BytesIO from paramiko.common import linefeed_byte, crlf, cr_byte from paramiko.file import BufferedFile -from .util import needs_builtin +from ._util import needs_builtin class LoopbackFile(BufferedFile): diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 671f1ba0..da62fd97 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -24,7 +24,7 @@ Test the used APIs for GSS-API / SSPI authentication import socket -from .util import needs_gssapi, KerberosTestCase, update_env +from ._util import needs_gssapi, KerberosTestCase, update_env # # NOTE: KerberosTestCase skips all tests if it was unable to import k5test diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py index bdda295a..a028411d 100644 --- a/tests/test_hostkeys.py +++ b/tests/test_hostkeys.py @@ -36,6 +36,11 @@ broken.example.com ssh-rsa AAAA happy.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\ 5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M= +modern.example.com ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIKHEChAIxsh2hr8Q\ ++Ea1AAHZyfEB2elEc2YgduVzBtp+ +curvy.example.com ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlz\ +dHAyNTYAAABBBAa+pY7djSpbg5viAcZhPt56AO3U3Sd7h7dnlUp0EjfDgyYHYQxl2QZ4JGgfwR5iv9\ +T9iRZjQzvJd5s+kBAZtpk= """ test_hosts_file_tabs = """\ @@ -45,9 +50,11 @@ D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz9uhc= happy.example.com\tssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\ 5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M= -doublespace.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkp\ -KhOk5r9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11M\ -l8om3D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz8BtZ= +modern.example.com\tssh-ed25519\tAAAAC3NzaC1lZDI1NTE5AAAAIKHEChAIxsh2hr8Q\ ++Ea1AAHZyfEB2elEc2YgduVzBtp+ +curvy.example.com\tecdsa-sha2-nistp256\tAAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbml\ +zdHAyNTYAAABBBAa+pY7djSpbg5viAcZhPt56AO3U3Sd7h7dnlUp0EjfDgyYHYQxl2QZ4JGgfwR5iv\ +9T9iRZjQzvJd5s+kBAZtpk= """ keyblob = b"""\ @@ -76,7 +83,7 @@ class HostKeysTest(unittest.TestCase): def test_load(self): hostdict = paramiko.HostKeys("hostfile.temp") - self.assertEqual(2, len(hostdict)) + assert len(hostdict) == 4 self.assertEqual(1, len(list(hostdict.values())[0])) self.assertEqual(1, len(list(hostdict.values())[1])) fp = hexlify( @@ -89,7 +96,7 @@ class HostKeysTest(unittest.TestCase): hh = "|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c=" key = paramiko.RSAKey(data=decodebytes(keyblob)) hostdict.add(hh, "ssh-rsa", key) - self.assertEqual(3, len(list(hostdict))) + assert len(hostdict) == 5 x = hostdict["foo.example.com"] fp = hexlify(x["ssh-rsa"].get_fingerprint()).upper() self.assertEqual(b"7EC91BB336CB6D810B124B1353C32396", fp) @@ -105,10 +112,8 @@ class HostKeysTest(unittest.TestCase): self.assertTrue(x is not None) fp = hexlify(x["ssh-rsa"].get_fingerprint()).upper() self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp) - i = 0 - for key in hostdict: - i += 1 - self.assertEqual(2, i) + assert list(hostdict) == hostdict.keys() + assert len(list(hostdict)) == len(hostdict.keys()) == 4 def test_dict_set(self): hostdict = paramiko.HostKeys("hostfile.temp") @@ -118,7 +123,7 @@ class HostKeysTest(unittest.TestCase): hostdict["fake.example.com"] = {} hostdict["fake.example.com"]["ssh-rsa"] = key - self.assertEqual(3, len(hostdict)) + assert len(hostdict) == 5 self.assertEqual(2, len(list(hostdict.values())[0])) self.assertEqual(1, len(list(hostdict.values())[1])) self.assertEqual(1, len(list(hostdict.values())[2])) diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index d4868f4a..a81b1959 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -31,7 +31,7 @@ import unittest import paramiko -from .util import needs_gssapi, KerberosTestCase, update_env +from ._util import needs_gssapi, KerberosTestCase, update_env, _support class NullServer(paramiko.ServerInterface): @@ -80,7 +80,7 @@ class GSSKexTest(KerberosTestCase): def _run(self): self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks, gss_kex=True) - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) self.ts.add_server_key(host_key) self.ts.set_gss_host(self.realm.hostname) try: @@ -96,7 +96,7 @@ class GSSKexTest(KerberosTestCase): Diffie-Hellman Key Exchange and user authentication with the GSS-API context created during key exchange. """ - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index d4dd58ad..aee21c21 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -30,7 +30,7 @@ from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes from paramiko import Message, Packetizer, util from paramiko.common import byte_chr, zero_byte -from .loop import LoopSocket +from ._loop import LoopSocket x55 = byte_chr(0x55) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 4d74d8aa..d4d193b8 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -45,7 +45,7 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers from unittest.mock import patch, Mock import pytest -from .util import _support, is_low_entropy, requires_sha1_signing +from ._util import _support, is_low_entropy, requires_sha1_signing # from openssh's ssh-keygen @@ -138,12 +138,6 @@ TEST_KEY_BYTESTR = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\ class KeyTest(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - def assert_keyfile_is_encrypted(self, keyfile): """ A quick check that filename looks like an encrypted key. @@ -161,7 +155,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(exp, key) def test_load_rsa(self): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) self.assertEqual("ssh-rsa", key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(":", "")) my_rsa = hexlify(key.get_fingerprint()) @@ -184,7 +178,7 @@ class KeyTest(unittest.TestCase): ) as loader: loader.side_effect = exception with pytest.raises(SSHException, match=str(exception)): - RSAKey.from_private_key_file(_support("test_rsa.key")) + RSAKey.from_private_key_file(_support("rsa.key")) def test_loading_empty_keys_errors_usefully(self): # #1599 - raise SSHException instead of IndexError @@ -203,7 +197,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(1024, key.get_bits()) def test_load_dss(self): - key = DSSKey.from_private_key_file(_support("test_dss.key")) + key = DSSKey.from_private_key_file(_support("dss.key")) self.assertEqual("ssh-dss", key.get_name()) exp_dss = b(FINGER_DSS.split()[1].replace(":", "")) my_dss = hexlify(key.get_fingerprint()) @@ -231,7 +225,7 @@ class KeyTest(unittest.TestCase): def test_compare_rsa(self): # verify that the private & public keys compare equal - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) self.assertEqual(key, key) pub = RSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -240,7 +234,7 @@ class KeyTest(unittest.TestCase): def test_compare_dss(self): # verify that the private & public keys compare equal - key = DSSKey.from_private_key_file(_support("test_dss.key")) + key = DSSKey.from_private_key_file(_support("dss.key")) self.assertEqual(key, key) pub = DSSKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -248,7 +242,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, pub) def _sign_and_verify_rsa(self, algorithm, saved_sig): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) msg = key.sign_ssh_data(b"ice weasels", algorithm) assert isinstance(msg, Message) msg.rewind() @@ -273,7 +267,7 @@ class KeyTest(unittest.TestCase): def test_sign_dss(self): # verify that the dss private key can sign and verify - key = DSSKey.from_private_key_file(_support("test_dss.key")) + key = DSSKey.from_private_key_file(_support("dss.key")) msg = key.sign_ssh_data(b"ice weasels") self.assertTrue(type(msg) is Message) msg.rewind() @@ -329,7 +323,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key.get_name(), "ecdsa-sha2-nistp521") def test_load_ecdsa_256(self): - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) self.assertEqual("ecdsa-sha2-nistp256", key.get_name()) exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(":", "")) my_ecdsa = hexlify(key.get_fingerprint()) @@ -357,7 +351,7 @@ class KeyTest(unittest.TestCase): def test_compare_ecdsa_256(self): # verify that the private & public keys compare equal - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -366,7 +360,7 @@ class KeyTest(unittest.TestCase): def test_sign_ecdsa_256(self): # verify that the rsa private key can sign and verify - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) msg = key.sign_ssh_data(b"ice weasels") self.assertTrue(type(msg) is Message) msg.rewind() @@ -408,7 +402,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(384, key.get_bits()) def test_load_ecdsa_transmutes_crypto_exceptions(self): - path = _support("test_ecdsa_256.key") + path = _support("ecdsa-256.key") # TODO: nix unittest for pytest for exception in (TypeError("onoz"), UnsupportedAlgorithm("oops")): with patch( @@ -569,12 +563,12 @@ class KeyTest(unittest.TestCase): RSAKey.from_private_key_file(_support("test_rsa_openssh_nopad.key")) def test_stringification(self): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) comparable = TEST_KEY_BYTESTR self.assertEqual(str(key), comparable) def test_ed25519(self): - key1 = Ed25519Key.from_private_key_file(_support("test_ed25519.key")) + key1 = Ed25519Key.from_private_key_file(_support("ed25519.key")) key2 = Ed25519Key.from_private_key_file( _support("test_ed25519_password.key"), b"abc123" ) @@ -594,7 +588,7 @@ class KeyTest(unittest.TestCase): def test_ed25519_compare(self): # verify that the private & public keys compare equal - key = Ed25519Key.from_private_key_file(_support("test_ed25519.key")) + key = Ed25519Key.from_private_key_file(_support("ed25519.key")) self.assertEqual(key, key) pub = Ed25519Key(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -616,33 +610,6 @@ class KeyTest(unittest.TestCase): ) assert original != generated - def keys(self): - for key_class, filename in [ - (RSAKey, "test_rsa.key"), - (DSSKey, "test_dss.key"), - (ECDSAKey, "test_ecdsa_256.key"), - (Ed25519Key, "test_ed25519.key"), - ]: - key1 = key_class.from_private_key_file(_support(filename)) - key2 = key_class.from_private_key_file(_support(filename)) - yield key1, key2 - - def test_keys_are_comparable(self): - for key1, key2 in self.keys(): - assert key1 == key2 - - def test_keys_are_not_equal_to_other(self): - for value in [None, True, ""]: - for key1, _ in self.keys(): - assert key1 != value - - def test_keys_are_hashable(self): - # NOTE: this isn't a great test due to hashseed randomization under - # Python 3 preventing use of static values, but it does still prove - # that __hash__ is implemented/doesn't explode & works across instances - for key1, key2 in self.keys(): - assert hash(key1) == hash(key2) - def test_ed25519_nonbytes_password(self): # https://github.com/paramiko/paramiko/issues/1039 Ed25519Key.from_private_key_file( @@ -654,7 +621,7 @@ class KeyTest(unittest.TestCase): # No exception -> it's good. Meh. def test_ed25519_load_from_file_obj(self): - with open(_support("test_ed25519.key")) as pkey_fileobj: + with open(_support("ed25519.key")) as pkey_fileobj: key = Ed25519Key.from_private_key(pkey_fileobj) self.assertEqual(key, key) self.assertTrue(key.can_sign()) @@ -674,43 +641,6 @@ class KeyTest(unittest.TestCase): finally: os.remove(newfile) - def test_certificates(self): - # NOTE: we also test 'live' use of cert auth for all key types in - # test_client.py; this and nearby cert tests are more about the gritty - # details. - # PKey.load_certificate - key_path = _support(os.path.join("cert_support", "test_rsa.key")) - key = RSAKey.from_private_key_file(key_path) - self.assertTrue(key.public_blob is None) - cert_path = _support( - os.path.join("cert_support", "test_rsa.key-cert.pub") - ) - key.load_certificate(cert_path) - self.assertTrue(key.public_blob is not None) - self.assertEqual( - key.public_blob.key_type, "ssh-rsa-cert-v01@openssh.com" - ) - self.assertEqual(key.public_blob.comment, "test_rsa.key.pub") - # Delve into blob contents, for test purposes - msg = Message(key.public_blob.key_blob) - self.assertEqual(msg.get_text(), "ssh-rsa-cert-v01@openssh.com") - msg.get_string() - e = msg.get_mpint() - n = msg.get_mpint() - self.assertEqual(e, key.public_numbers.e) - self.assertEqual(n, key.public_numbers.n) - # Serial number - self.assertEqual(msg.get_int64(), 1234) - - # Prevented from loading certificate that doesn't match - key_path = _support(os.path.join("cert_support", "test_ed25519.key")) - key1 = Ed25519Key.from_private_key_file(key_path) - self.assertRaises( - ValueError, - key1.load_certificate, - _support("test_rsa.key-cert.pub"), - ) - @patch("paramiko.pkey.os") def _test_keyfile_race(self, os_, exists): # Re: CVE-2022-24302 @@ -764,22 +694,3 @@ class KeyTest(unittest.TestCase): finally: if os.path.exists(new): os.unlink(new) - - def test_sign_rsa_with_certificate(self): - data = b"ice weasels" - key_path = _support(os.path.join("cert_support", "test_rsa.key")) - key = RSAKey.from_private_key_file(key_path) - msg = key.sign_ssh_data(data, "rsa-sha2-256") - msg.rewind() - assert "rsa-sha2-256" == msg.get_text() - sign = msg.get_binary() - cert_path = _support( - os.path.join("cert_support", "test_rsa.key-cert.pub") - ) - key.load_certificate(cert_path) - msg = key.sign_ssh_data(data, "rsa-sha2-256-cert-v01@openssh.com") - msg.rewind() - assert "rsa-sha2-256" == msg.get_text() - assert sign == msg.get_binary() - msg.rewind() - assert key.verify_ssh_sig(b"ice weasels", msg) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index be123de4..7fd274bc 100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -38,8 +38,8 @@ from paramiko.sftp_attr import SFTPAttributes from paramiko.util import b, u from tests import requireNonAsciiLocale -from .util import needs_builtin -from .util import slow +from ._util import needs_builtin +from ._util import slow ARTICLE = """ diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index 5192f657..acfe71e3 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -30,7 +30,7 @@ import time from paramiko.common import o660 -from .util import slow +from ._util import slow @slow diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index a8175ccb..b441a225 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -28,7 +28,7 @@ import threading import paramiko -from .util import _support, needs_gssapi, KerberosTestCase, update_env +from ._util import _support, needs_gssapi, KerberosTestCase, update_env from .test_client import FINGERPRINTS @@ -89,7 +89,7 @@ class GSSAuthTest(KerberosTestCase): def _run(self): self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks) - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) self.ts.add_server_key(host_key) server = NullServer() self.ts.start_server(self.event, server) @@ -100,7 +100,7 @@ class GSSAuthTest(KerberosTestCase): The exception is ... no exception yet """ - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() @@ -154,7 +154,7 @@ class GSSAuthTest(KerberosTestCase): "this_host_does_not_exists_and_causes_a_GSSAPI-exception" ) self._test_connection( - key_filename=[_support("test_rsa.key")], + key_filename=[_support("rsa.key")], allow_agent=False, look_for_keys=False, ) diff --git a/tests/test_transport.py b/tests/test_transport.py index 4062d767..b2efd637 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -22,7 +22,6 @@ Some unit tests for the ssh2 protocol in Transport. from binascii import hexlify -from contextlib import contextmanager import select import socket import time @@ -34,18 +33,16 @@ from unittest.mock import Mock from paramiko import ( AuthHandler, ChannelException, - DSSKey, Packetizer, RSAKey, SSHException, - AuthenticationException, IncompatiblePeer, SecurityOptions, - ServerInterface, + ServiceRequestingTransport, Transport, ) -from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL -from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED +from paramiko.auth_handler import AuthOnlyHandler +from paramiko import OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED from paramiko.common import ( DEFAULT_MAX_PACKET_SIZE, DEFAULT_WINDOW_SIZE, @@ -60,8 +57,17 @@ from paramiko.common import ( ) from paramiko.message import Message -from .util import needs_builtin, _support, requires_sha1_signing, slow -from .loop import LoopSocket +from ._util import ( + needs_builtin, + _support, + requires_sha1_signing, + slow, + server, + _disable_sha2, + _disable_sha1, + TestServer as NullServer, +) +from ._loop import LoopSocket LONG_BANNER = """\ @@ -77,80 +83,11 @@ Maybe. """ -class NullServer(ServerInterface): - paranoid_did_password = False - paranoid_did_public_key = False - paranoid_key = DSSKey.from_private_key_file(_support("test_dss.key")) - - def __init__(self, allowed_keys=None): - self.allowed_keys = allowed_keys if allowed_keys is not None else [] - - def get_allowed_auths(self, username): - if username == "slowdive": - return "publickey,password" - return "publickey" - - def check_auth_password(self, username, password): - if (username == "slowdive") and (password == "pygmalion"): - return AUTH_SUCCESSFUL - return AUTH_FAILED - - def check_auth_publickey(self, username, key): - if key in self.allowed_keys: - return AUTH_SUCCESSFUL - return AUTH_FAILED - - def check_channel_request(self, kind, chanid): - if kind == "bogus": - return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED - return OPEN_SUCCEEDED - - def check_channel_exec_request(self, channel, command): - if command != b"yes": - return False - return True - - def check_channel_shell_request(self, channel): - return True - - def check_global_request(self, kind, msg): - self._global_request = kind - # NOTE: for w/e reason, older impl of this returned False always, even - # tho that's only supposed to occur if the request cannot be served. - # For now, leaving that the default unless test supplies specific - # 'acceptable' request kind - return kind == "acceptable" - - def check_channel_x11_request( - self, - channel, - single_connection, - auth_protocol, - auth_cookie, - screen_number, - ): - self._x11_single_connection = single_connection - self._x11_auth_protocol = auth_protocol - self._x11_auth_cookie = auth_cookie - self._x11_screen_number = screen_number - return True - - def check_port_forward_request(self, addr, port): - self._listen = socket.socket() - self._listen.bind(("127.0.0.1", 0)) - self._listen.listen(1) - return self._listen.getsockname()[1] - - def cancel_port_forward_request(self, addr, port): - self._listen.close() - self._listen = None - - def check_channel_direct_tcpip_request(self, chanid, origin, destination): - self._tcpip_dest = destination - return OPEN_SUCCEEDED - - class TransportTest(unittest.TestCase): + # TODO: this can get nuked once ServiceRequestingTransport becomes the + # only Transport, as it has this baked in. + _auth_handler_class = AuthHandler + def setUp(self): self.socks = LoopSocket() self.sockc = LoopSocket() @@ -168,7 +105,7 @@ class TransportTest(unittest.TestCase): def setup_test_server( self, client_options=None, server_options=None, connect_kwargs=None ): - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) @@ -234,7 +171,7 @@ class TransportTest(unittest.TestCase): loopback sockets. this is hardly "simple" but it's simpler than the later tests. :) """ - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -260,7 +197,7 @@ class TransportTest(unittest.TestCase): """ verify that a long banner doesn't mess up the handshake. """ - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -910,7 +847,7 @@ class TransportTest(unittest.TestCase): # be fine. Even tho it's a bit squicky. self.tc.packetizer = SlowPacketizer(self.tc.sock) # Continue with regular test red tape. - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -1099,7 +1036,8 @@ class TransportTest(unittest.TestCase): def test_server_transports_reject_client_message_types(self): # TODO: handle Transport's own tables too, not just its inner auth # handler's table. See TODOs in auth_handler.py - for message_type in AuthHandler._client_handler_table: + some_handler = self._auth_handler_class(self.tc) + for message_type in some_handler._client_handler_table: self._send_client_message(message_type) self._expect_unimplemented() # Reset for rest of loop @@ -1115,6 +1053,21 @@ class TransportTest(unittest.TestCase): self._expect_unimplemented() +# TODO: for now this is purely a regression test. It needs actual tests of the +# intentional new behavior too! +class ServiceRequestingTransportTest(TransportTest): + _auth_handler_class = AuthOnlyHandler + + def setUp(self): + # Copypasta (Transport init is load-bearing) + self.socks = LoopSocket() + self.sockc = LoopSocket() + self.sockc.link(self.socks) + # New class who dis + self.tc = ServiceRequestingTransport(self.sockc) + self.ts = ServiceRequestingTransport(self.socks) + + class AlgorithmDisablingTests(unittest.TestCase): def test_preferred_lists_default_to_private_attribute_contents(self): t = Transport(sock=Mock()) @@ -1188,98 +1141,6 @@ class AlgorithmDisablingTests(unittest.TestCase): assert "zlib" not in compressions -@contextmanager -def server( - hostkey=None, - init=None, - server_init=None, - client_init=None, - connect=None, - pubkeys=None, - catch_error=False, -): - """ - SSH server contextmanager for testing. - - :param hostkey: - Host key to use for the server; if None, loads - ``test_rsa.key``. - :param init: - Default `Transport` constructor kwargs to use for both sides. - :param server_init: - Extends and/or overrides ``init`` for server transport only. - :param client_init: - Extends and/or overrides ``init`` for client transport only. - :param connect: - Kwargs to use for ``connect()`` on the client. - :param pubkeys: - List of public keys for auth. - :param catch_error: - Whether to capture connection errors & yield from contextmanager. - Necessary for connection_time exception testing. - """ - if init is None: - init = {} - if server_init is None: - server_init = {} - if client_init is None: - client_init = {} - if connect is None: - connect = dict(username="slowdive", password="pygmalion") - socks = LoopSocket() - sockc = LoopSocket() - sockc.link(socks) - tc = Transport(sockc, **dict(init, **client_init)) - ts = Transport(socks, **dict(init, **server_init)) - - if hostkey is None: - hostkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - ts.add_server_key(hostkey) - event = threading.Event() - server = NullServer(allowed_keys=pubkeys) - assert not event.is_set() - assert not ts.is_active() - assert tc.get_username() is None - assert ts.get_username() is None - assert not tc.is_authenticated() - assert not ts.is_authenticated() - - err = None - # Trap errors and yield instead of raising right away; otherwise callers - # cannot usefully deal with problems at connect time which stem from errors - # in the server side. - try: - ts.start_server(event, server) - tc.connect(**connect) - - event.wait(1.0) - assert event.is_set() - assert ts.is_active() - assert tc.is_active() - - except Exception as e: - if not catch_error: - raise - err = e - - yield (tc, ts, err) if catch_error else (tc, ts) - - tc.close() - ts.close() - socks.close() - sockc.close() - - -_disable_sha2 = dict( - disabled_algorithms=dict(keys=["rsa-sha2-256", "rsa-sha2-512"]) -) -_disable_sha1 = dict(disabled_algorithms=dict(keys=["ssh-rsa"])) -_disable_sha2_pubkey = dict( - disabled_algorithms=dict(pubkeys=["rsa-sha2-256", "rsa-sha2-512"]) -) -_disable_sha1_pubkey = dict(disabled_algorithms=dict(pubkeys=["ssh-rsa"])) - - class TestSHA2SignatureKeyExchange(unittest.TestCase): # NOTE: these all rely on the default server() hostkey being RSA # NOTE: these rely on both sides being properly implemented re: agreed-upon @@ -1343,8 +1204,11 @@ class TestSHA2SignatureKeyExchange(unittest.TestCase): # (This is a regression test vs previous implementation which overwrote # the entire preferred-hostkeys structure when given an explicit key as # a client.) - hostkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server(hostkey=hostkey, connect=dict(hostkey=hostkey)) as (tc, _): + hostkey = RSAKey.from_private_key_file(_support("rsa.key")) + connect = dict( + hostkey=hostkey, username="slowdive", password="pygmalion" + ) + with server(hostkey=hostkey, connect=connect) as (tc, _): assert tc.host_key_type == "rsa-sha2-512" @@ -1358,7 +1222,7 @@ class TestExtInfo(unittest.TestCase): } def test_client_uses_server_sig_algs_for_pubkey_auth(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1367,94 +1231,6 @@ class TestExtInfo(unittest.TestCase): ), ) as (tc, _): assert tc.is_authenticated() - # Client settled on 256 despite itself not having 512 disabled - assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" - - -# TODO: these could move into test_auth.py but that badly needs refactoring -# with this module anyways... -class TestSHA2SignaturePubkeys(unittest.TestCase): - def test_pubkey_auth_honors_disabled_algorithms(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - init=dict( - disabled_algorithms=dict( - pubkeys=["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512"] - ) - ), - catch_error=True, - ) as (_, _, err): - assert isinstance(err, SSHException) - assert "no RSA pubkey algorithms" in str(err) - - def test_client_sha2_disabled_server_sha1_disabled_no_match(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - client_init=_disable_sha2_pubkey, - server_init=_disable_sha1_pubkey, - catch_error=True, - ) as (tc, ts, err): - assert isinstance(err, AuthenticationException) - - def test_client_sha1_disabled_server_sha2_disabled_no_match(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - client_init=_disable_sha1_pubkey, - server_init=_disable_sha2_pubkey, - catch_error=True, - ) as (tc, ts, err): - assert isinstance(err, AuthenticationException) - - @requires_sha1_signing - def test_ssh_rsa_still_used_when_sha2_disabled(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - # NOTE: this works because key obj comparison uses public bytes - # TODO: would be nice for PKey to grow a legit "give me another obj of - # same class but just the public bits" using asbytes() - with server( - pubkeys=[privkey], connect=dict(pkey=privkey), init=_disable_sha2 - ) as (tc, _): - assert tc.is_authenticated() - - def test_sha2_512(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - init=dict( - disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-256"]) - ), - ) as (tc, ts): - assert tc.is_authenticated() - assert tc._agreed_pubkey_algorithm == "rsa-sha2-512" - - def test_sha2_256(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - init=dict( - disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"]) - ), - ) as (tc, ts): - assert tc.is_authenticated() - assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" - - def test_sha2_256_when_client_only_enables_256(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - # Client-side only; server still accepts all 3. - client_init=dict( - disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"]) - ), - ) as (tc, ts): - assert tc.is_authenticated() + # Client settled on 256 despite itself not having 512 disabled (and + # otherwise, 512 would have been earlier in the preferred list) assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" diff --git a/tests/test_util.py b/tests/test_util.py index ec03846b..a2a8224e 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -49,6 +49,11 @@ class UtilTest(unittest.TestCase): "Agent", "AgentKey", "AuthenticationException", + "AuthFailure", + "AuthHandler", + "AuthResult", + "AuthSource", + "AuthStrategy", "AutoAddPolicy", "BadAuthenticationType", "BufferedFile", @@ -58,9 +63,14 @@ class UtilTest(unittest.TestCase): "CouldNotCanonicalize", "DSSKey", "HostKeys", + "InMemoryPrivateKey", "Message", "MissingHostKeyPolicy", + "NoneAuth", + "OnDiskPrivateKey", + "Password", "PasswordRequiredException", + "PrivateKey", "RSAKey", "RejectPolicy", "SFTP", @@ -77,12 +87,13 @@ class UtilTest(unittest.TestCase): "SSHException", "SecurityOptions", "ServerInterface", + "SourceResult", "SubsystemHandler", "Transport", "WarningPolicy", "util", ): - assert name in paramiko.__all__ + assert name in dir(paramiko) def test_generate_key_bytes(self): key_bytes = paramiko.util.generate_key_bytes( diff --git a/tests/util.py b/tests/util.py deleted file mode 100644 index 0639f8ae..00000000 --- a/tests/util.py +++ /dev/null @@ -1,174 +0,0 @@ -from os.path import dirname, realpath, join -import builtins -import os -import struct -import sys -import unittest - -import pytest - -from paramiko.ssh_gss import GSS_AUTH_AVAILABLE - -from cryptography.exceptions import UnsupportedAlgorithm, _Reasons -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.asymmetric import padding, rsa - -tests_dir = dirname(realpath(__file__)) - - -def _support(filename): - return join(tests_dir, filename) - - -def _config(name): - return join(tests_dir, "configs", name) - - -needs_gssapi = pytest.mark.skipif( - not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test" -) - - -def needs_builtin(name): - """ - Skip decorated test if builtin name does not exist. - """ - reason = "Test requires a builtin '{}'".format(name) - return pytest.mark.skipif(not hasattr(builtins, name), reason=reason) - - -slow = pytest.mark.slow - -# GSSAPI / Kerberos related tests need a working Kerberos environment. -# The class `KerberosTestCase` provides such an environment or skips all tests. -# There are 3 distinct cases: -# -# - A Kerberos environment has already been created and the environment -# contains the required information. -# -# - We can use the package 'k5test' to setup an working kerberos environment on -# the fly. -# -# - We skip all tests. -# -# ToDo: add a Windows specific implementation? - -if ( - os.environ.get("K5TEST_USER_PRINC", None) - and os.environ.get("K5TEST_HOSTNAME", None) - and os.environ.get("KRB5_KTNAME", None) -): # add other vars as needed - - # The environment provides the required information - class DummyK5Realm: - def __init__(self): - for k in os.environ: - if not k.startswith("K5TEST_"): - continue - setattr(self, k[7:].lower(), os.environ[k]) - self.env = {} - - class KerberosTestCase(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.realm = DummyK5Realm() - - @classmethod - def tearDownClass(cls): - del cls.realm - -else: - try: - # Try to setup a kerberos environment - from k5test import KerberosTestCase - except Exception: - # Use a dummy, that skips all tests - class KerberosTestCase(unittest.TestCase): - @classmethod - def setUpClass(cls): - raise unittest.SkipTest( - "Missing extension package k5test. " - 'Please run "pip install k5test" ' - "to install it." - ) - - -def update_env(testcase, mapping, env=os.environ): - """Modify os.environ during a test case and restore during cleanup.""" - saved_env = env.copy() - - def replace(target, source): - target.update(source) - for k in list(target): - if k not in source: - target.pop(k, None) - - testcase.addCleanup(replace, env, saved_env) - env.update(mapping) - return testcase - - -def k5shell(args=None): - """Create a shell with an kerberos environment - - This can be used to debug paramiko or to test the old GSSAPI. - To test a different GSSAPI, simply activate a suitable venv - within the shell. - """ - import k5test - import atexit - import subprocess - - k5 = k5test.K5Realm() - atexit.register(k5.stop) - os.environ.update(k5.env) - for n in ("realm", "user_princ", "hostname"): - os.environ["K5TEST_" + n.upper()] = getattr(k5, n) - - if not args: - args = sys.argv[1:] - if not args: - args = [os.environ.get("SHELL", "bash")] - sys.exit(subprocess.call(args)) - - -def is_low_entropy(): - """ - Attempts to detect whether running interpreter is low-entropy. - - "low-entropy" is defined as being in 32-bit mode and with the hash seed set - to zero. - """ - is_32bit = struct.calcsize("P") == 32 / 8 - # I don't see a way to tell internally if the hash seed was set this - # way, but env should be plenty sufficient, this is only for testing. - return is_32bit and os.environ.get("PYTHONHASHSEED", None) == "0" - - -def sha1_signing_unsupported(): - """ - This is used to skip tests in environments where SHA-1 signing is - not supported by the backend. - """ - private_key = rsa.generate_private_key( - public_exponent=65537, key_size=2048, backend=default_backend() - ) - message = b"Some dummy text" - try: - private_key.sign( - message, - padding.PSS( - mgf=padding.MGF1(hashes.SHA1()), - salt_length=padding.PSS.MAX_LENGTH, - ), - hashes.SHA1(), - ) - return False - except UnsupportedAlgorithm as e: - return e._reason is _Reasons.UNSUPPORTED_HASH - - -requires_sha1_signing = unittest.skipIf( - sha1_signing_unsupported(), "SHA-1 signing not supported" -) |