diff options
-rw-r--r-- | .codespellrc | 2 | ||||
-rw-r--r-- | .git-blame-ignore-revs | 2 | ||||
-rw-r--r-- | MANIFEST.in | 2 | ||||
-rw-r--r-- | README.rst | 13 | ||||
-rw-r--r-- | dev-requirements.txt | 6 | ||||
-rw-r--r-- | paramiko/__init__.py | 25 | ||||
-rw-r--r-- | paramiko/_version.py | 2 | ||||
-rw-r--r-- | paramiko/agent.py | 73 | ||||
-rw-r--r-- | paramiko/auth_handler.py | 193 | ||||
-rw-r--r-- | paramiko/auth_strategy.py | 306 | ||||
-rw-r--r-- | paramiko/client.py | 30 | ||||
-rw-r--r-- | paramiko/dsskey.py | 15 | ||||
-rw-r--r-- | paramiko/ecdsakey.py | 8 | ||||
-rw-r--r-- | paramiko/ed25519key.py | 17 | ||||
-rw-r--r-- | paramiko/hostkeys.py | 47 | ||||
-rw-r--r-- | paramiko/pkey.py | 200 | ||||
-rw-r--r-- | paramiko/rsakey.py | 18 | ||||
-rw-r--r-- | paramiko/server.py | 4 | ||||
-rw-r--r-- | paramiko/ssh_exception.py | 5 | ||||
-rw-r--r-- | paramiko/transport.py | 200 | ||||
-rw-r--r-- | pytest.ini | 7 | ||||
-rw-r--r-- | sites/docs/api/auth.rst | 8 | ||||
-rw-r--r-- | sites/docs/index.rst | 1 | ||||
-rw-r--r-- | sites/www/changelog.rst | 126 | ||||
-rw-r--r-- | tasks.py | 22 | ||||
-rw-r--r-- | tests/_loop.py (renamed from tests/loop.py) | 0 | ||||
-rw-r--r-- | tests/_stub_sftp.py (renamed from tests/stub_sftp.py) | 0 | ||||
-rw-r--r-- | tests/_support/dss.key (renamed from tests/cert_support/test_dss.key) | 0 | ||||
-rw-r--r-- | tests/_support/dss.key-cert.pub (renamed from tests/cert_support/test_dss.key-cert.pub) | 0 | ||||
-rw-r--r-- | tests/_support/ecdsa-256.key (renamed from tests/cert_support/test_ecdsa_256.key) | 0 | ||||
-rw-r--r-- | tests/_support/ecdsa-256.key-cert.pub (renamed from tests/cert_support/test_ecdsa_256.key-cert.pub) | 0 | ||||
-rw-r--r-- | tests/_support/ed25519.key (renamed from tests/cert_support/test_ed25519.key) | 0 | ||||
-rw-r--r-- | tests/_support/ed25519.key-cert.pub (renamed from tests/cert_support/test_ed25519.key-cert.pub) | 0 | ||||
-rw-r--r-- | tests/_support/ed448.key | 4 | ||||
-rw-r--r-- | tests/_support/rsa-lonely.key (renamed from tests/cert_support/test_rsa.key) | 0 | ||||
-rw-r--r-- | tests/_support/rsa-missing.key-cert.pub (renamed from tests/cert_support/test_rsa.key-cert.pub) | 0 | ||||
-rw-r--r-- | tests/_support/rsa.key (renamed from tests/test_rsa.key) | 0 | ||||
-rw-r--r-- | tests/_support/rsa.key-cert.pub | 1 | ||||
-rw-r--r-- | tests/_util.py | 441 | ||||
-rw-r--r-- | tests/agent.py | 151 | ||||
-rw-r--r-- | tests/auth.py | 580 | ||||
-rw-r--r-- | tests/conftest.py | 75 | ||||
-rw-r--r-- | tests/pkey.py | 229 | ||||
-rw-r--r-- | tests/test_agent.py | 50 | ||||
-rw-r--r-- | tests/test_auth.py | 272 | ||||
-rw-r--r-- | tests/test_client.py | 69 | ||||
-rw-r--r-- | tests/test_config.py | 2 | ||||
-rw-r--r-- | tests/test_dss.key | 12 | ||||
-rw-r--r-- | tests/test_ecdsa_256.key | 5 | ||||
-rw-r--r-- | tests/test_ed25519.key | 8 | ||||
-rw-r--r-- | tests/test_file.py | 2 | ||||
-rw-r--r-- | tests/test_gssapi.py | 2 | ||||
-rw-r--r-- | tests/test_hostkeys.py | 25 | ||||
-rw-r--r-- | tests/test_kex_gss.py | 6 | ||||
-rw-r--r-- | tests/test_packetizer.py | 2 | ||||
-rw-r--r-- | tests/test_pkey.py | 121 | ||||
-rw-r--r-- | tests/test_sftp.py | 4 | ||||
-rw-r--r-- | tests/test_sftp_big.py | 2 | ||||
-rw-r--r-- | tests/test_ssh_gss.py | 8 | ||||
-rw-r--r-- | tests/test_transport.py | 318 | ||||
-rw-r--r-- | tests/test_util.py | 13 | ||||
-rw-r--r-- | tests/util.py | 174 |
62 files changed, 2812 insertions, 1096 deletions
diff --git a/.codespellrc b/.codespellrc index f6935aa7..a8d619e5 100644 --- a/.codespellrc +++ b/.codespellrc @@ -4,4 +4,4 @@ skip = venvs,.venv,.git,build,*.egg-info,*.lock,*.js,*.css,docs # Certain words AUTHOR feels strongly about, plus various proper names that are # close enough to real words that they anger codespell. (NOTE: for some reason # codespell wants the latter listed in all-lowercase...!) -ignore-words-list = keypair,flage +ignore-words-list = keypair,flage,lew,welp,strat diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 00000000..f8f0117c --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1,2 @@ +# blackening +7f2c35052183b400827d9949a68b41c90f90a32d diff --git a/MANIFEST.in b/MANIFEST.in index c7de9097..8c6fc5d9 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ -include LICENSE setup_helper.py +include LICENSE setup_helper.py pytest.ini recursive-include docs * recursive-include tests *.py *.key *.pub recursive-include tests/configs * @@ -41,8 +41,11 @@ personal site. <https://www.paramiko.org/installing.html>`_ for details. .. [#] - SSH is defined in :rfc-reference:`4251`, :rfc-reference:`4252`, - :rfc-reference:`4253` and :rfc-reference:`4254`. The primary working - implementation of the protocol is the `OpenSSH project - <http://openssh.org>`_. Paramiko implements a large portion of the SSH - feature set, but there are occasional gaps. + OpenSSH's RFC specification page is a fantastic resource and collection of + links that we won't bother replicating here: + https://www.openssh.com/specs.html + + OpenSSH itself also happens to be our primary reference implementation: + when in doubt, we consult how they do things, unless there are good reasons + not to. There are always some gaps, but we do our best to reconcile them + when possible. diff --git a/dev-requirements.txt b/dev-requirements.txt index 26d0efa4..43d01e0d 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,6 +1,6 @@ # Invocations for common project tasks invoke>=2.0 -invocations>=3.0 +invocations>=3.2 # Testing! pytest-relaxed>=2 # pytest-xdist for test dir watching and the inv guard task @@ -10,12 +10,14 @@ flake8>=4,<5 # Formatting! black>=22.8,<22.9 # Spelling! -codespell>=2.2,<2.3 +# TODO Python 3.7: newer codespell has upgraded lists +codespell>=2.2.1,<2.3 # Coverage! coverage>=6.2,<7 # Documentation tools alabaster==0.7.13 releases>=2.1 +watchdog<2 # Debuggery icecream>=2.1 # Self (sans GSS which is a pain to bother with most of the time) diff --git a/paramiko/__init__.py b/paramiko/__init__.py index cbc240a6..476062ef 100644 --- a/paramiko/__init__.py +++ b/paramiko/__init__.py @@ -19,7 +19,11 @@ # flake8: noqa import sys from paramiko._version import __version__, __version_info__ -from paramiko.transport import SecurityOptions, Transport +from paramiko.transport import ( + SecurityOptions, + Transport, + ServiceRequestingTransport, +) from paramiko.client import ( SSHClient, MissingHostKeyPolicy, @@ -28,6 +32,18 @@ from paramiko.client import ( WarningPolicy, ) from paramiko.auth_handler import AuthHandler +from paramiko.auth_strategy import ( + AuthFailure, + AuthStrategy, + AuthResult, + AuthSource, + InMemoryPrivateKey, + NoneAuth, + OnDiskPrivateKey, + Password, + PrivateKey, + SourceResult, +) from paramiko.ssh_gss import GSSAuth, GSS_AUTH_AVAILABLE, GSS_EXCEPTIONS from paramiko.channel import ( Channel, @@ -63,7 +79,7 @@ from paramiko.message import Message from paramiko.packet import Packetizer from paramiko.file import BufferedFile from paramiko.agent import Agent, AgentKey -from paramiko.pkey import PKey, PublicBlob +from paramiko.pkey import PKey, PublicBlob, UnknownKeyType from paramiko.hostkeys import HostKeys from paramiko.config import SSHConfig, SSHConfigDict from paramiko.proxy import ProxyCommand @@ -94,9 +110,14 @@ from paramiko.sftp import ( from paramiko.common import io_sleep +# TODO: I guess a real plugin system might be nice for future expansion... +key_classes = [DSSKey, RSAKey, Ed25519Key, ECDSAKey] + + __author__ = "Jeff Forcier <jeff@bitprophet.org>" __license__ = "GNU Lesser General Public License (LGPL)" +# TODO 4.0: remove this, jeez __all__ = [ "Agent", "AgentKey", diff --git a/paramiko/_version.py b/paramiko/_version.py index ba3f1b3c..a008f109 100644 --- a/paramiko/_version.py +++ b/paramiko/_version.py @@ -1,2 +1,2 @@ -__version_info__ = (3, 1, 0) +__version_info__ = (3, 2, 0) __version__ = ".".join(map(str, __version_info__)) diff --git a/paramiko/agent.py b/paramiko/agent.py index 30ec1590..b29a0d14 100644 --- a/paramiko/agent.py +++ b/paramiko/agent.py @@ -28,13 +28,14 @@ import threading import time import tempfile import stat +from logging import DEBUG from select import select from paramiko.common import io_sleep, byte_chr from paramiko.ssh_exception import SSHException, AuthenticationException from paramiko.message import Message -from paramiko.pkey import PKey -from paramiko.util import asbytes +from paramiko.pkey import PKey, UnknownKeyType +from paramiko.util import asbytes, get_logger cSSH2_AGENTC_REQUEST_IDENTITIES = byte_chr(11) SSH2_AGENT_IDENTITIES_ANSWER = 12 @@ -52,8 +53,11 @@ ALGORITHM_FLAG_MAP = { "rsa-sha2-256": SSH_AGENT_RSA_SHA2_256, "rsa-sha2-512": SSH_AGENT_RSA_SHA2_512, } +for key, value in list(ALGORITHM_FLAG_MAP.items()): + ALGORITHM_FLAG_MAP[f"{key}-cert-v01@openssh.com"] = value +# TODO 4.0: rename all these - including making some of their methods public? class AgentSSH: def __init__(self): self._conn = None @@ -81,8 +85,13 @@ class AgentSSH: raise SSHException("could not get keys from ssh-agent") keys = [] for i in range(result.get_int()): - keys.append(AgentKey(self, result.get_binary())) - result.get_string() + keys.append( + AgentKey( + agent=self, + blob=result.get_binary(), + comment=result.get_text(), + ) + ) self._keys = tuple(keys) def _close(self): @@ -417,31 +426,69 @@ class AgentKey(PKey): Private key held in a local SSH agent. This type of key can be used for authenticating to a remote server (signing). Most other key operations work as expected. + + .. versionchanged:: 3.2 + Added the ``comment`` kwarg and attribute. + + .. versionchanged:: 3.2 + Added the ``.inner_key`` attribute holding a reference to the 'real' + key instance this key is a proxy for, if one was obtainable, else None. """ - def __init__(self, agent, blob): + def __init__(self, agent, blob, comment=""): self.agent = agent self.blob = blob - self.public_blob = None - self.name = Message(blob).get_text() + self.comment = comment + msg = Message(blob) + self.name = msg.get_text() + self._logger = get_logger(__file__) + self.inner_key = None + try: + self.inner_key = PKey.from_type_string( + key_type=self.name, key_bytes=blob + ) + except UnknownKeyType: + # Log, but don't explode, since inner_key is a best-effort thing. + err = "Unable to derive inner_key for agent key of type {!r}" + self.log(DEBUG, err.format(self.name)) - def asbytes(self): - return self.blob + def log(self, *args, **kwargs): + return self._logger.log(*args, **kwargs) - def __str__(self): - return self.asbytes() + def asbytes(self): + # Prefer inner_key.asbytes, since that will differ for eg RSA-CERT + return self.inner_key.asbytes() if self.inner_key else self.blob def get_name(self): return self.name + def get_bits(self): + # Have to work around PKey's default get_bits being crap + if self.inner_key is not None: + return self.inner_key.get_bits() + return super().get_bits() + + def __getattr__(self, name): + """ + Proxy any un-implemented methods/properties to the inner_key. + """ + if self.inner_key is None: # nothing to proxy to + raise AttributeError(name) + return getattr(self.inner_key, name) + @property def _fields(self): - raise NotImplementedError + fallback = [self.get_name(), self.blob] + return self.inner_key._fields if self.inner_key else fallback def sign_ssh_data(self, data, algorithm=None): msg = Message() msg.add_byte(cSSH2_AGENTC_SIGN_REQUEST) - msg.add_string(self.blob) + # NOTE: this used to be just self.blob, which is not entirely right for + # RSA-CERT 'keys' - those end up always degrading to ssh-rsa type + # signatures, for reasons probably internal to OpenSSH's agent code, + # even if everything else wants SHA2 (including our flag map). + msg.add_string(self.asbytes()) msg.add_string(data) msg.add_int(ALGORITHM_FLAG_MAP.get(algorithm, 0)) ptype, result = self.agent._send_message(msg) diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py index 22f506c1..bc7f298f 100644 --- a/paramiko/auth_handler.py +++ b/paramiko/auth_handler.py @@ -21,6 +21,7 @@ """ import weakref +import threading import time import re @@ -241,7 +242,9 @@ class AuthHandler: if not self.transport.is_active(): e = self.transport.get_exception() if (e is None) or issubclass(e.__class__, EOFError): - e = AuthenticationException("Authentication failed.") + e = AuthenticationException( + "Authentication failed: transport shut down or saw EOF" + ) raise e if event.is_set(): break @@ -254,6 +257,7 @@ class AuthHandler: e = AuthenticationException("Authentication failed.") # this is horrible. Python Exception isn't yet descended from # object, so type(e) won't work. :( + # TODO 4.0: lol. just lmao. if issubclass(e.__class__, PartialAuthentication): return e.allowed_types raise e @@ -289,6 +293,17 @@ class AuthHandler: return None return self.transport._key_info[algorithm](Message(keyblob)) + def _choose_fallback_pubkey_algorithm(self, key_type, my_algos): + # Fallback: first one in our (possibly tweaked by caller) list + pubkey_algo = my_algos[0] + msg = "Server did not send a server-sig-algs list; defaulting to our first preferred algo ({!r})" # noqa + self._log(DEBUG, msg.format(pubkey_algo)) + self._log( + DEBUG, + "NOTE: you may use the 'disabled_algorithms' SSHClient/Transport init kwarg to disable that or other algorithms if your server does not support them!", # noqa + ) + return pubkey_algo + def _finalize_pubkey_algorithm(self, key_type): # Short-circuit for non-RSA keys if "rsa" not in key_type: @@ -329,6 +344,7 @@ class AuthHandler: self.transport.server_extensions.get("server-sig-algs", b("")) ) pubkey_algo = None + # Prefer to match against server-sig-algs if server_algo_str: server_algos = server_algo_str.split(",") self._log( @@ -350,14 +366,10 @@ class AuthHandler: # technically for initial key exchange, not pubkey auth. err = "Unable to agree on a pubkey algorithm for signing a {!r} key!" # noqa raise AuthenticationException(err.format(key_type)) + # Fallback to something based purely on the key & our configuration else: - # Fallback: first one in our (possibly tweaked by caller) list - pubkey_algo = my_algos[0] - msg = "Server did not send a server-sig-algs list; defaulting to our first preferred algo ({!r})" # noqa - self._log(DEBUG, msg.format(pubkey_algo)) - self._log( - DEBUG, - "NOTE: you may use the 'disabled_algorithms' SSHClient/Transport init kwarg to disable that or other algorithms if your server does not support them!", # noqa + pubkey_algo = self._choose_fallback_pubkey_algorithm( + key_type, my_algos ) if key_type.endswith("-cert-v01@openssh.com"): pubkey_algo += "-cert-v01@openssh.com" @@ -367,9 +379,6 @@ class AuthHandler: def _parse_service_accept(self, m): service = m.get_text() if service == "ssh-userauth": - # TODO 4.0: this message sucks ass. change it to something more - # obvious. it always appears to mean "we already authed" but no! it - # just means "we are allowed to TRY authing!" self._log(DEBUG, "userauth is OK") m = Message() m.add_byte(cMSG_USERAUTH_REQUEST) @@ -725,6 +734,9 @@ Error Message: {} def _parse_userauth_failure(self, m): authlist = m.get_list() + # TODO 4.0: we aren't giving callers access to authlist _unless_ it's + # partial authentication, so eg authtype=none can't work unless we + # tweak this. partial = m.get_boolean() if partial: self._log(INFO, "Authentication continues...") @@ -805,7 +817,6 @@ Error Message: {} self.auth_event.set() return - # TODO: do the same to the other tables, in Transport. # TODO 4.0: MAY make sense to make these tables into actual # classes/instances that can be fed a mode bool or whatever. Or, # alternately (both?) make the message types small classes or enums that @@ -814,20 +825,27 @@ Error Message: {} # TODO: if we do that, also expose 'em publicly. # Messages which should be handled _by_ servers (sent by clients) - _server_handler_table = { - MSG_SERVICE_REQUEST: _parse_service_request, - MSG_USERAUTH_REQUEST: _parse_userauth_request, - MSG_USERAUTH_INFO_RESPONSE: _parse_userauth_info_response, - } + @property + def _server_handler_table(self): + return { + # TODO 4.0: MSG_SERVICE_REQUEST ought to eventually move into + # Transport's server mode like the client side did, just for + # consistency. + MSG_SERVICE_REQUEST: self._parse_service_request, + MSG_USERAUTH_REQUEST: self._parse_userauth_request, + MSG_USERAUTH_INFO_RESPONSE: self._parse_userauth_info_response, + } # Messages which should be handled _by_ clients (sent by servers) - _client_handler_table = { - MSG_SERVICE_ACCEPT: _parse_service_accept, - MSG_USERAUTH_SUCCESS: _parse_userauth_success, - MSG_USERAUTH_FAILURE: _parse_userauth_failure, - MSG_USERAUTH_BANNER: _parse_userauth_banner, - MSG_USERAUTH_INFO_REQUEST: _parse_userauth_info_request, - } + @property + def _client_handler_table(self): + return { + MSG_SERVICE_ACCEPT: self._parse_service_accept, + MSG_USERAUTH_SUCCESS: self._parse_userauth_success, + MSG_USERAUTH_FAILURE: self._parse_userauth_failure, + MSG_USERAUTH_BANNER: self._parse_userauth_banner, + MSG_USERAUTH_INFO_REQUEST: self._parse_userauth_info_request, + } # NOTE: prior to the fix for #1283, this was a static dict instead of a # property. Should be backwards compatible in most/all cases. @@ -945,3 +963,130 @@ class GssapiWithMicAuthHandler: # TODO: determine if we can cut this up like we did for the primary # AuthHandler class. return self.__handler_table + + +class AuthOnlyHandler(AuthHandler): + """ + AuthHandler, and just auth, no service requests! + + .. versionadded:: 3.2 + """ + + # NOTE: this purposefully duplicates some of the parent class in order to + # modernize, refactor, etc. The intent is that eventually we will collapse + # this one onto the parent in a backwards incompatible release. + + @property + def _client_handler_table(self): + my_table = super()._client_handler_table.copy() + del my_table[MSG_SERVICE_ACCEPT] + return my_table + + def send_auth_request(self, username, method, finish_message=None): + """ + Submit a userauth request message & wait for response. + + Performs the transport message send call, sets self.auth_event, and + will lock-n-block as necessary to both send, and wait for response to, + the USERAUTH_REQUEST. + + Most callers will want to supply a callback to ``finish_message``, + which accepts a Message ``m`` and may call mutator methods on it to add + more fields. + """ + # Store a few things for reference in handlers, including auth failure + # handler (which needs to know if we were using a bad method, etc) + self.auth_method = method + self.username = username + # Generic userauth request fields + m = Message() + m.add_byte(cMSG_USERAUTH_REQUEST) + m.add_string(username) + m.add_string("ssh-connection") + m.add_string(method) + # Caller usually has more to say, such as injecting password, key etc + finish_message(m) + # TODO 4.0: seems odd to have the client handle the lock and not + # Transport; that _may_ have been an artifact of allowing user + # threading event injection? Regardless, we don't want to move _this_ + # locking into Transport._send_message now, because lots of other + # untouched code also uses that method and we might end up + # double-locking (?) but 4.0 would be a good time to revisit. + with self.transport.lock: + self.transport._send_message(m) + # We have cut out the higher level event args, but self.auth_event is + # still required for self.wait_for_response to function correctly (it's + # the mechanism used by the auth success/failure handlers, the abort + # handler, and a few other spots like in gssapi. + # TODO: interestingly, wait_for_response itself doesn't actually + # enforce that its event argument and self.auth_event are the same... + self.auth_event = threading.Event() + return self.wait_for_response(self.auth_event) + + def auth_none(self, username): + return self.send_auth_request(username, "none") + + def auth_publickey(self, username, key): + key_type, bits = self._get_key_type_and_bits(key) + algorithm = self._finalize_pubkey_algorithm(key_type) + blob = self._get_session_blob( + key, + "ssh-connection", + username, + algorithm, + ) + + def finish(m): + # This field doesn't appear to be named, but is False when querying + # for permission (ie knowing whether to even prompt a user for + # passphrase, etc) or True when just going for it. Paramiko has + # never bothered with the former type of message, apparently. + m.add_boolean(True) + m.add_string(algorithm) + m.add_string(bits) + m.add_string(key.sign_ssh_data(blob, algorithm)) + + return self.send_auth_request(username, "publickey", finish) + + def auth_password(self, username, password): + def finish(m): + # Unnamed field that equates to "I am changing my password", which + # Paramiko clientside never supported and serverside only sort of + # supported. + m.add_boolean(False) + m.add_string(b(password)) + + return self.send_auth_request(username, "password", finish) + + def auth_interactive(self, username, handler, submethods=""): + """ + response_list = handler(title, instructions, prompt_list) + """ + # Unlike most siblings, this auth method _does_ require other + # superclass handlers (eg userauth info request) to understand + # what's going on, so we still set some self attributes. + self.auth_method = "keyboard_interactive" + self.interactive_handler = handler + + def finish(m): + # Empty string for deprecated language tag field, per RFC 4256: + # https://www.rfc-editor.org/rfc/rfc4256#section-3.1 + m.add_string("") + m.add_string(submethods) + + return self.send_auth_request(username, "keyboard-interactive", finish) + + # NOTE: not strictly 'auth only' related, but allows users to opt-in. + def _choose_fallback_pubkey_algorithm(self, key_type, my_algos): + msg = "Server did not send a server-sig-algs list; defaulting to something in our preferred algorithms list" # noqa + self._log(DEBUG, msg) + noncert_key_type = key_type.replace("-cert-v01@openssh.com", "") + if key_type in my_algos or noncert_key_type in my_algos: + actual = key_type if key_type in my_algos else noncert_key_type + msg = f"Current key type, {actual!r}, is in our preferred list; using that" # noqa + algo = actual + else: + algo = my_algos[0] + msg = f"{key_type!r} not in our list - trying first list item instead, {algo!r}" # noqa + self._log(DEBUG, msg) + return algo diff --git a/paramiko/auth_strategy.py b/paramiko/auth_strategy.py new file mode 100644 index 00000000..03c1d877 --- /dev/null +++ b/paramiko/auth_strategy.py @@ -0,0 +1,306 @@ +""" +Modern, adaptable authentication machinery. + +Replaces certain parts of `.SSHClient`. For a concrete implementation, see the +``OpenSSHAuthStrategy`` class in `Fabric <https://fabfile.org>`_. +""" + +from collections import namedtuple + +from .agent import AgentKey +from .util import get_logger +from .ssh_exception import AuthenticationException + + +class AuthSource: + """ + Some SSH authentication source, such as a password, private key, or agent. + + See subclasses in this module for concrete implementations. + + All implementations must accept at least a ``username`` (``str``) kwarg. + """ + + def __init__(self, username): + self.username = username + + def _repr(self, **kwargs): + # TODO: are there any good libs for this? maybe some helper from + # structlog? + pairs = [f"{k}={v!r}" for k, v in kwargs.items()] + joined = ", ".join(pairs) + return f"{self.__class__.__name__}({joined})" + + def __repr__(self): + return self._repr() + + def authenticate(self, transport): + """ + Perform authentication. + """ + raise NotImplementedError + + +class NoneAuth(AuthSource): + """ + Auth type "none", ie https://www.rfc-editor.org/rfc/rfc4252#section-5.2 . + """ + + def authenticate(self, transport): + return transport.auth_none(self.username) + + +class Password(AuthSource): + """ + Password authentication. + + :param callable password_getter: + A lazy callable that should return a `str` password value at + authentication time, such as a `functools.partial` wrapping + `getpass.getpass`, an API call to a secrets store, or similar. + + If you already know the password at instantiation time, you should + simply use something like ``lambda: "my literal"`` (for a literal, but + also, shame on you!) or ``lambda: variable_name`` (for something stored + in a variable). + """ + + def __init__(self, username, password_getter): + super().__init__(username=username) + self.password_getter = password_getter + + def __repr__(self): + # Password auth is marginally more 'username-caring' than pkeys, so may + # as well log that info here. + return super()._repr(user=self.username) + + def authenticate(self, transport): + # Lazily get the password, in case it's prompting a user + # TODO: be nice to log source _of_ the password? + password = self.password_getter() + return transport.auth_password(self.username, password) + + +# TODO 4.0: twiddle this, or PKey, or both, so they're more obviously distinct. +# TODO 4.0: the obvious is to make this more wordy (PrivateKeyAuth), the +# minimalist approach might be to rename PKey to just Key (esp given all the +# subclasses are WhateverKey and not WhateverPKey) +class PrivateKey(AuthSource): + """ + Essentially a mixin for private keys. + + Knows how to auth, but leaves key material discovery/loading/decryption to + subclasses. + + Subclasses **must** ensure that they've set ``self.pkey`` to a decrypted + `.PKey` instance before calling ``super().authenticate``; typically + either in their ``__init__``, or in an overridden ``authenticate`` prior to + its `super` call. + """ + + def authenticate(self, transport): + return transport.auth_publickey(self.username, self.pkey) + + +class InMemoryPrivateKey(PrivateKey): + """ + An in-memory, decrypted `.PKey` object. + """ + + def __init__(self, username, pkey): + super().__init__(username=username) + # No decryption (presumably) necessary! + self.pkey = pkey + + def __repr__(self): + # NOTE: most of interesting repr-bits for private keys is in PKey. + # TODO: tacking on agent-ness like this is a bit awkward, but, eh? + rep = super()._repr(pkey=self.pkey) + if isinstance(self.pkey, AgentKey): + rep += " [agent]" + return rep + + +class OnDiskPrivateKey(PrivateKey): + """ + Some on-disk private key that needs opening and possibly decrypting. + + :param str source: + String tracking where this key's path was specified; should be one of + ``"ssh-config"``, ``"python-config"``, or ``"implicit-home"``. + :param Path path: + The filesystem path this key was loaded from. + :param PKey pkey: + The `PKey` object this auth source uses/represents. + """ + + def __init__(self, username, source, path, pkey): + super().__init__(username=username) + self.source = source + allowed = ("ssh-config", "python-config", "implicit-home") + if source not in allowed: + raise ValueError(f"source argument must be one of: {allowed!r}") + self.path = path + # Superclass wants .pkey, other two are mostly for display/debugging. + self.pkey = pkey + + def __repr__(self): + return self._repr( + key=self.pkey, source=self.source, path=str(self.path) + ) + + +# TODO re sources: is there anything in an OpenSSH config file that doesn't fit +# into what Paramiko already had kwargs for? + + +SourceResult = namedtuple("SourceResult", ["source", "result"]) + +# TODO: tempting to make this an OrderedDict, except the keys essentially want +# to be rich objects (AuthSources) which do not make for useful user indexing? +# TODO: members being vanilla tuples is pretty old-school/expedient; they +# "really" want to be something that's type friendlier (unless the tuple's 2nd +# member being a Union of two types is "fine"?), which I assume means yet more +# classes, eg an abstract SourceResult with concrete AuthSuccess and +# AuthFailure children? +# TODO: arguably we want __init__ typechecking of the members (or to leverage +# mypy by classifying this literally as list-of-AuthSource?) +class AuthResult(list): + """ + Represents a partial or complete SSH authentication attempt. + + This class conceptually extends `AuthStrategy` by pairing the former's + authentication **sources** with the **results** of trying to authenticate + with them. + + `AuthResult` is a (subclass of) `list` of `namedtuple`, which are of the + form ``namedtuple('SourceResult', 'source', 'result')`` (where the + ``source`` member is an `AuthSource` and the ``result`` member is either a + return value from the relevant `.Transport` method, or an exception + object). + + .. note:: + Transport auth method results are always themselves a ``list`` of "next + allowable authentication methods". + + In the simple case of "you just authenticated successfully", it's an + empty list; if your auth was rejected but you're allowed to try again, + it will be a list of string method names like ``pubkey`` or + ``password``. + + The ``__str__`` of this class represents the empty-list scenario as the + word ``success``, which should make reading the result of an + authentication session more obvious to humans. + + Instances also have a `strategy` attribute referencing the `AuthStrategy` + which was attempted. + """ + + def __init__(self, strategy, *args, **kwargs): + self.strategy = strategy + super().__init__(*args, **kwargs) + + def __str__(self): + # NOTE: meaningfully distinct from __repr__, which still wants to use + # superclass' implementation. + # TODO: go hog wild, use rich.Table? how is that on degraded term's? + # TODO: test this lol + return "\n".join( + f"{x.source} -> {x.result or 'success'}" for x in self + ) + + +# TODO 4.0: descend from SSHException or even just Exception +class AuthFailure(AuthenticationException): + """ + Basic exception wrapping an `AuthResult` indicating overall auth failure. + + Note that `AuthFailure` descends from `AuthenticationException` but is + generally "higher level"; the latter is now only raised by individual + `AuthSource` attempts and should typically only be seen by users when + encapsulated in this class. It subclasses `AuthenticationException` + primarily for backwards compatibility reasons. + """ + + def __init__(self, result): + self.result = result + + def __str__(self): + return "\n" + str(self.result) + + +class AuthStrategy: + """ + This class represents one or more attempts to auth with an SSH server. + + By default, subclasses must at least accept an ``ssh_config`` + (`.SSHConfig`) keyword argument, but may opt to accept more as needed for + their particular strategy. + """ + + def __init__( + self, + ssh_config, + ): + self.ssh_config = ssh_config + self.log = get_logger(__name__) + + def get_sources(self): + """ + Generator yielding `AuthSource` instances, in the order to try. + + This is the primary override point for subclasses: you figure out what + sources you need, and ``yield`` them. + + Subclasses _of_ subclasses may find themselves wanting to do things + like filtering or discarding around a call to `super`. + """ + raise NotImplementedError + + def authenticate(self, transport): + """ + Handles attempting `AuthSource` instances yielded from `get_sources`. + + You *normally* won't need to override this, but it's an option for + advanced users. + """ + succeeded = False + overall_result = AuthResult(strategy=self) + # TODO: arguably we could fit in a "send none auth, record allowed auth + # types sent back" thing here as OpenSSH-client does, but that likely + # wants to live in fabric.OpenSSHAuthStrategy as not all target servers + # will implement it! + # TODO: needs better "server told us too many attempts" checking! + for source in self.get_sources(): + self.log.debug(f"Trying {source}") + try: # NOTE: this really wants to _only_ wrap the authenticate()! + result = source.authenticate(transport) + succeeded = True + # TODO: 'except PartialAuthentication' is needed for 2FA and + # similar, as per old SSHClient.connect - it is the only way + # AuthHandler supplies access to the 'name-list' field from + # MSG_USERAUTH_FAILURE, at present. + except Exception as e: + result = e + # TODO: look at what this could possibly raise, we don't really + # want Exception here, right? just SSHException subclasses? or + # do we truly want to capture anything at all with assumption + # it's easy enough for users to look afterwards? + # NOTE: showing type, not message, for tersity & also most of + # the time it's basically just "Authentication failed." + source_class = e.__class__.__name__ + self.log.info( + f"Authentication via {source} failed with {source_class}" + ) + overall_result.append(SourceResult(source, result)) + if succeeded: + break + # Gotta die here if nothing worked, otherwise Transport's main loop + # just kinda hangs out until something times out! + if not succeeded: + raise AuthFailure(result=overall_result) + # Success: give back what was done, in case they care. + return overall_result + + # TODO: is there anything OpenSSH client does which _can't_ cleanly map to + # iterating a generator? diff --git a/paramiko/client.py b/paramiko/client.py index 1fe14b07..d8be9108 100644 --- a/paramiko/client.py +++ b/paramiko/client.py @@ -238,6 +238,7 @@ class SSHClient(ClosingContextManager): passphrase=None, disabled_algorithms=None, transport_factory=None, + auth_strategy=None, ): """ Connect to an SSH server and authenticate to it. The server's host key @@ -323,10 +324,28 @@ class SSHClient(ClosingContextManager): functionality, and algorithm selection) and generates a `.Transport` instance to be used by this client. Defaults to `.Transport.__init__`. + :param auth_strategy: + an optional instance of `.AuthStrategy`, triggering use of this + newer authentication mechanism instead of SSHClient's legacy auth + method. + + .. warning:: + This parameter is **incompatible** with all other + authentication-related parameters (such as, but not limited to, + ``password``, ``key_filename`` and ``allow_agent``) and will + trigger an exception if given alongside them. + + :returns: + `.AuthResult` if ``auth_strategy`` is non-``None``; otherwise, + returns ``None``. :raises BadHostKeyException: if the server's host key could not be verified. - :raises AuthenticationException: if authentication failed. + :raises AuthenticationException: + if authentication failed. + :raises UnableToAuthenticate: + if authentication failed (when ``auth_strategy`` is non-``None``; + and note that this is a subclass of ``AuthenticationException``). :raises socket.error: if a socket error (other than connection-refused or host-unreachable) occurred while connecting. @@ -349,6 +368,8 @@ class SSHClient(ClosingContextManager): Added the ``disabled_algorithms`` argument. .. versionchanged:: 2.12 Added the ``transport_factory`` argument. + .. versionchanged:: 3.2 + Added the ``auth_strategy`` argument. """ if not sock: errors = {} @@ -449,6 +470,11 @@ class SSHClient(ClosingContextManager): if username is None: username = getpass.getuser() + # New auth flow! + if auth_strategy is not None: + return auth_strategy.authenticate(transport=t) + + # Old auth flow! if key_filename is None: key_filenames = [] elif isinstance(key_filename, str): @@ -697,6 +723,8 @@ class SSHClient(ClosingContextManager): if not two_factor: for key_filename in key_filenames: + # TODO 4.0: leverage PKey.from_path() if we don't end up just + # killing SSHClient entirely for pkey_class in (RSAKey, DSSKey, ECDSAKey, Ed25519Key): try: key = self._key_from_filepath( diff --git a/paramiko/dsskey.py b/paramiko/dsskey.py index 5a0f85eb..5215d282 100644 --- a/paramiko/dsskey.py +++ b/paramiko/dsskey.py @@ -43,6 +43,8 @@ class DSSKey(PKey): data. """ + name = "ssh-dss" + def __init__( self, msg=None, @@ -71,8 +73,8 @@ class DSSKey(PKey): else: self._check_type_and_load_cert( msg=msg, - key_type="ssh-dss", - cert_type="ssh-dss-cert-v01@openssh.com", + key_type=self.name, + cert_type=f"{self.name}-cert-v01@openssh.com", ) self.p = msg.get_mpint() self.q = msg.get_mpint() @@ -82,7 +84,7 @@ class DSSKey(PKey): def asbytes(self): m = Message() - m.add_string("ssh-dss") + m.add_string(self.name) m.add_mpint(self.p) m.add_mpint(self.q) m.add_mpint(self.g) @@ -96,8 +98,9 @@ class DSSKey(PKey): def _fields(self): return (self.get_name(), self.p, self.q, self.g, self.y) + # TODO 4.0: remove def get_name(self): - return "ssh-dss" + return self.name def get_bits(self): return self.size @@ -119,7 +122,7 @@ class DSSKey(PKey): r, s = decode_dss_signature(sig) m = Message() - m.add_string("ssh-dss") + m.add_string(self.name) # apparently, in rare cases, r or s may be shorter than 20 bytes! rstr = util.deflate_long(r, 0) sstr = util.deflate_long(s, 0) @@ -136,7 +139,7 @@ class DSSKey(PKey): sig = msg.asbytes() else: kind = msg.get_text() - if kind != "ssh-dss": + if kind != self.name: return 0 sig = msg.get_binary() diff --git a/paramiko/ecdsakey.py b/paramiko/ecdsakey.py index e2279754..6fd95fab 100644 --- a/paramiko/ecdsakey.py +++ b/paramiko/ecdsakey.py @@ -114,6 +114,7 @@ class ECDSAKey(PKey): password=None, vals=None, file_obj=None, + # TODO 4.0: remove; it does nothing since porting to cryptography.io validate_point=True, ): self.verifying_key = None @@ -168,9 +169,14 @@ class ECDSAKey(PKey): raise SSHException("Invalid public key") @classmethod - def supported_key_format_identifiers(cls): + def identifiers(cls): return cls._ECDSA_CURVES.get_key_format_identifier_list() + # TODO 4.0: deprecate/remove + @classmethod + def supported_key_format_identifiers(cls): + return cls.identifiers() + def asbytes(self): key = self.verifying_key m = Message() diff --git a/paramiko/ed25519key.py b/paramiko/ed25519key.py index c8c4da6c..e5e81ac5 100644 --- a/paramiko/ed25519key.py +++ b/paramiko/ed25519key.py @@ -39,6 +39,8 @@ class Ed25519Key(PKey): Added a ``file_obj`` parameter to match other key classes. """ + name = "ssh-ed25519" + def __init__( self, msg=None, data=None, filename=None, password=None, file_obj=None ): @@ -49,7 +51,7 @@ class Ed25519Key(PKey): if msg is not None: self._check_type_and_load_cert( msg=msg, - key_type="ssh-ed25519", + key_type=self.name, cert_type="ssh-ed25519-cert-v01@openssh.com", ) verifying_key = nacl.signing.VerifyKey(msg.get_binary()) @@ -108,7 +110,7 @@ class Ed25519Key(PKey): public_keys = [] for _ in range(num_keys): pubkey = Message(message.get_binary()) - if pubkey.get_text() != "ssh-ed25519": + if pubkey.get_text() != self.name: raise SSHException("Invalid key") public_keys.append(pubkey.get_binary()) @@ -141,7 +143,7 @@ class Ed25519Key(PKey): signing_keys = [] for i in range(num_keys): - if message.get_text() != "ssh-ed25519": + if message.get_text() != self.name: raise SSHException("Invalid key") # A copy of the public key, again, ignore. public = message.get_binary() @@ -170,7 +172,7 @@ class Ed25519Key(PKey): else: v = self._verifying_key m = Message() - m.add_string("ssh-ed25519") + m.add_string(self.name) m.add_string(v.encode()) return m.asbytes() @@ -182,8 +184,9 @@ class Ed25519Key(PKey): v = self._verifying_key return (self.get_name(), v) + # TODO 4.0: remove def get_name(self): - return "ssh-ed25519" + return self.name def get_bits(self): return 256 @@ -193,12 +196,12 @@ class Ed25519Key(PKey): def sign_ssh_data(self, data, algorithm=None): m = Message() - m.add_string("ssh-ed25519") + m.add_string(self.name) m.add_string(self._signing_key.sign(data).signature) return m def verify_ssh_sig(self, data, msg): - if msg.get_text() != "ssh-ed25519": + if msg.get_text() != self.name: return False try: diff --git a/paramiko/hostkeys.py b/paramiko/hostkeys.py index bbfa5755..4d47e950 100644 --- a/paramiko/hostkeys.py +++ b/paramiko/hostkeys.py @@ -27,11 +27,8 @@ from hashlib import sha1 from hmac import HMAC -from paramiko.dsskey import DSSKey -from paramiko.rsakey import RSAKey +from paramiko.pkey import PKey, UnknownKeyType from paramiko.util import get_logger, constant_time_bytes_eq, b, u -from paramiko.ecdsakey import ECDSAKey -from paramiko.ed25519key import Ed25519Key from paramiko.ssh_exception import SSHException @@ -95,16 +92,16 @@ class HostKeys(MutableMapping): if (len(line) == 0) or (line[0] == "#"): continue try: - e = HostKeyEntry.from_line(line, lineno) + entry = HostKeyEntry.from_line(line, lineno) except SSHException: continue - if e is not None: - _hostnames = e.hostnames + if entry is not None: + _hostnames = entry.hostnames for h in _hostnames: - if self.check(h, e.key): - e.hostnames.remove(h) - if len(e.hostnames): - self._entries.append(e) + if self.check(h, entry.key): + entry.hostnames.remove(h) + if len(entry.hostnames): + self._entries.append(entry) def save(self, filename): """ @@ -347,29 +344,27 @@ class HostKeyEntry: return None fields = fields[:3] - names, keytype, key = fields + names, key_type, key = fields names = names.split(",") # Decide what kind of key we're looking at and create an object # to hold it accordingly. try: - key = b(key) - if keytype == "ssh-rsa": - key = RSAKey(data=decodebytes(key)) - elif keytype == "ssh-dss": - key = DSSKey(data=decodebytes(key)) - elif keytype in ECDSAKey.supported_key_format_identifiers(): - key = ECDSAKey(data=decodebytes(key), validate_point=False) - elif keytype == "ssh-ed25519": - key = Ed25519Key(data=decodebytes(key)) - else: - log.info("Unable to handle key of type {}".format(keytype)) - return None - + # TODO: this grew organically and doesn't seem /wrong/ per se (file + # read -> unicode str -> bytes for base64 decode -> decoded bytes); + # but in Python 3 forever land, can we simply use + # `base64.b64decode(str-from-file)` here? + key_bytes = decodebytes(b(key)) except binascii.Error as e: raise InvalidHostKey(line, e) - return cls(names, key) + try: + return cls(names, PKey.from_type_string(key_type, key_bytes)) + except UnknownKeyType: + # TODO 4.0: consider changing HostKeys API so this just raises + # naturally and the exception is muted higher up in the stack? + log.info("Unable to handle key of type {}".format(key_type)) + return None def to_line(self): """ diff --git a/paramiko/pkey.py b/paramiko/pkey.py index 32d8cad5..ef371002 100644 --- a/paramiko/pkey.py +++ b/paramiko/pkey.py @@ -24,7 +24,8 @@ import base64 from base64 import encodebytes, decodebytes from binascii import unhexlify import os -from hashlib import md5 +from pathlib import Path +from hashlib import md5, sha256 import re import struct @@ -33,6 +34,7 @@ import bcrypt from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.ciphers import algorithms, modes, Cipher +from cryptography.hazmat.primitives import asymmetric from paramiko import util from paramiko.util import u, b @@ -59,9 +61,25 @@ def _unpad_openssh(data): return data[:-padding_length] +class UnknownKeyType(Exception): + """ + An unknown public/private key algorithm was attempted to be read. + """ + + def __init__(self, key_type=None, key_bytes=None): + self.key_type = key_type + self.key_bytes = key_bytes + + def __str__(self): + return f"UnknownKeyType(type={self.key_type!r}, bytes=<{len(self.key_bytes)}>)" # noqa + + class PKey: """ Base class for public keys. + + Also includes some "meta" level convenience constructors such as + `.from_type_string`. """ # known encryption types for private key files: @@ -92,6 +110,127 @@ class PKey: ) END_TAG = re.compile(r"^-{5}END (RSA|DSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$") + @staticmethod + def from_path(path, passphrase=None): + """ + Attempt to instantiate appropriate key subclass from given file path. + + :param Path path: The path to load (may also be a `str`). + + :returns: + A `PKey` subclass instance. + + :raises: + `UnknownKeyType`, if our crypto backend doesn't know this key type. + + .. versionadded:: 3.2 + """ + # TODO: make sure sphinx is reading Path right in param list... + + # Lazy import to avoid circular import issues + from paramiko import DSSKey, RSAKey, Ed25519Key, ECDSAKey + + # Normalize to string, as cert suffix isn't quite an extension, so + # pathlib isn't useful for this. + path = str(path) + + # Sort out cert vs key, i.e. it is 'legal' to hand this kind of API + # /either/ the key /or/ the cert, when there is a key/cert pair. + cert_suffix = "-cert.pub" + if str(path).endswith(cert_suffix): + key_path = path[: -len(cert_suffix)] + cert_path = path + else: + key_path = path + cert_path = path + cert_suffix + + key_path = Path(key_path).expanduser() + cert_path = Path(cert_path).expanduser() + + data = key_path.read_bytes() + # Like OpenSSH, try modern/OpenSSH-specific key load first + try: + loaded = serialization.load_ssh_private_key( + data=data, password=passphrase + ) + # Then fall back to assuming legacy PEM type + except ValueError: + loaded = serialization.load_pem_private_key( + data=data, password=passphrase + ) + # TODO Python 3.10: match statement? (NOTE: we cannot use a dict + # because the results from the loader are literal backend, eg openssl, + # private classes, so isinstance tests work but exact 'x class is y' + # tests will not work) + # TODO: leverage already-parsed/math'd obj to avoid duplicate cpu + # cycles? seemingly requires most of our key subclasses to be rewritten + # to be cryptography-object-forward. this is still likely faster than + # the old SSHClient code that just tried instantiating every class! + key_class = None + if isinstance(loaded, asymmetric.dsa.DSAPrivateKey): + key_class = DSSKey + elif isinstance(loaded, asymmetric.rsa.RSAPrivateKey): + key_class = RSAKey + elif isinstance(loaded, asymmetric.ed25519.Ed25519PrivateKey): + key_class = Ed25519Key + elif isinstance(loaded, asymmetric.ec.EllipticCurvePrivateKey): + key_class = ECDSAKey + else: + raise UnknownKeyType(key_bytes=data, key_type=loaded.__class__) + with key_path.open() as fd: + key = key_class.from_private_key(fd, password=passphrase) + if cert_path.exists(): + # load_certificate can take Message, path-str, or value-str + key.load_certificate(str(cert_path)) + return key + + @staticmethod + def from_type_string(key_type, key_bytes): + """ + Given type `str` & raw `bytes`, return a `PKey` subclass instance. + + For example, ``PKey.from_type_string("ssh-ed25519", <public bytes>)`` + will (if successful) return a new `.Ed25519Key`. + + :param str key_type: + The key type, eg ``"ssh-ed25519"``. + :param bytes key_bytes: + The raw byte data forming the key material, as expected by + subclasses' ``data`` parameter. + + :returns: + A `PKey` subclass instance. + + :raises: + `UnknownKeyType`, if no registered classes knew about this type. + + .. versionadded:: 3.2 + """ + from paramiko import key_classes + + for key_class in key_classes: + if key_type in key_class.identifiers(): + # TODO: needs to passthru things like passphrase + return key_class(data=key_bytes) + raise UnknownKeyType(key_type=key_type, key_bytes=key_bytes) + + @classmethod + def identifiers(cls): + """ + returns an iterable of key format/name strings this class can handle. + + Most classes only have a single identifier, and thus this default + implementation suffices; see `.ECDSAKey` for one example of an + override. + """ + return [cls.name] + + # TODO 4.0: make this and subclasses consistent, some of our own + # classmethods even assume kwargs we don't define! + # TODO 4.0: prob also raise NotImplementedError instead of pass'ing; the + # contract is pretty obviously that you need to handle msg/data/filename + # appropriately. (If 'pass' is a concession to testing, see about doing the + # work to fix the tests instead) def __init__(self, msg=None, data=None): """ Create a new instance of this public key type. If ``msg`` is given, @@ -101,8 +240,8 @@ class PKey: :param .Message msg: an optional SSH `.Message` containing a public key of this type. - :param str data: an optional string containing a public key - of this type + :param bytes data: + optional, the bytes of a public key of this type :raises: `.SSHException` -- if a key cannot be created from the ``data`` or ``msg`` given, or @@ -110,6 +249,19 @@ class PKey: """ pass + # TODO: arguably this might want to be __str__ instead? ehh + # TODO: ditto the interplay between showing class name (currently we just + # say PKey writ large) and algorithm (usually == class name, but not + # always, also sometimes shows certificate-ness) + # TODO: if we do change it, we also want to tweak eg AgentKey, as it + # currently displays agent-ness with a suffix + def __repr__(self): + comment = "" + # Works for AgentKey, may work for others? + if hasattr(self, "comment") and self.comment: + comment = f", comment={self.comment!r}" + return f"PKey(alg={self.algorithm_name}, bits={self.get_bits()}, fp={self.fingerprint}{comment})" # noqa + # TODO 4.0: just merge into __bytes__ (everywhere) def asbytes(self): """ @@ -142,6 +294,26 @@ class PKey: """ return "" + @property + def algorithm_name(self): + """ + Return the key algorithm identifier for this key. + + Similar to `get_name`, but aimed at pure algorithm name instead of SSH + protocol field value. + """ + # Nuke the leading 'ssh-' + # TODO in Python 3.9: use .removeprefix() + name = self.get_name().replace("ssh-", "") + # Trim any cert suffix (but leave the -cert, as OpenSSH does) + cert_tail = "-cert-v01@openssh.com" + if cert_tail in name: + name = name.replace(cert_tail, "-cert") + # Nuke any eg ECDSA suffix, OpenSSH does basically this too. + else: + name = name.split("-")[0] + return name.upper() + def get_bits(self): """ Return the number of significant bits in this key. This is useful @@ -149,6 +321,8 @@ class PKey: :return: bits in the key (as an `int`) """ + # TODO 4.0: raise NotImplementedError, 0 is unlikely to ever be + # _correct_ and nothing in the critical path seems to use this. return 0 def can_sign(self): @@ -169,6 +343,21 @@ class PKey: """ return md5(self.asbytes()).digest() + @property + def fingerprint(self): + """ + Modern fingerprint property designed to be comparable to OpenSSH. + + Currently only does SHA256 (the OpenSSH default). + + .. versionadded:: 3.2 + """ + hashy = sha256(bytes(self)) + hash_name = hashy.name.upper() + b64ed = encodebytes(hashy.digest()) + cleaned = u(b64ed).strip().rstrip("=") # yes, OpenSSH does this too! + return f"{hash_name}:{cleaned}" + def get_base64(self): """ Return a base64 string containing the public part of this key. Nothing @@ -278,6 +467,7 @@ class PKey: :raises: ``IOError`` -- if there was an error writing to the file :raises: `.SSHException` -- if the key is invalid """ + # TODO 4.0: NotImplementedError (plus everywhere else in here) raise Exception("Not implemented in PKey") def _read_private_key_file(self, tag, filename, password=None): @@ -613,7 +803,9 @@ class PKey: # message; they're *IO objects at heart and their .getvalue() # always returns the full value regardless of pointer position. self.load_certificate(Message(msg.asbytes())) - # Read out nonce as it comes before the public numbers. + # Read out nonce as it comes before the public numbers - our caller + # is likely going to use the (only borrowed by us, not owned) + # 'msg' object for loading those numbers right after this. # TODO: usefully interpret it & other non-public-number fields # (requires going back into per-type subclasses.) msg.get_string() diff --git a/paramiko/rsakey.py b/paramiko/rsakey.py index 9ea00e95..b7ad3ce2 100644 --- a/paramiko/rsakey.py +++ b/paramiko/rsakey.py @@ -36,6 +36,7 @@ class RSAKey(PKey): data. """ + name = "ssh-rsa" HASHES = { "ssh-rsa": hashes.SHA1, "ssh-rsa-cert-v01@openssh.com": hashes.SHA1, @@ -71,13 +72,17 @@ class RSAKey(PKey): msg=msg, # NOTE: this does NOT change when using rsa2 signatures; it's # purely about key loading, not exchange or verification - key_type="ssh-rsa", + key_type=self.name, cert_type="ssh-rsa-cert-v01@openssh.com", ) self.key = rsa.RSAPublicNumbers( e=msg.get_mpint(), n=msg.get_mpint() ).public_key(default_backend()) + @classmethod + def identifiers(cls): + return list(cls.HASHES.keys()) + @property def size(self): return self.key.key_size @@ -91,7 +96,7 @@ class RSAKey(PKey): def asbytes(self): m = Message() - m.add_string("ssh-rsa") + m.add_string(self.name) m.add_mpint(self.public_numbers.e) m.add_mpint(self.public_numbers.n) return m.asbytes() @@ -106,7 +111,7 @@ class RSAKey(PKey): return (self.get_name(), self.public_numbers.e, self.public_numbers.n) def get_name(self): - return "ssh-rsa" + return self.name def get_bits(self): return self.size @@ -114,13 +119,18 @@ class RSAKey(PKey): def can_sign(self): return isinstance(self.key, rsa.RSAPrivateKey) - def sign_ssh_data(self, data, algorithm="ssh-rsa"): + def sign_ssh_data(self, data, algorithm=None): + if algorithm is None: + algorithm = self.name sig = self.key.sign( data, padding=padding.PKCS1v15(), + # HASHES being just a map from long identifier to either SHA1 or + # SHA256 - cert'ness is not truly relevant. algorithm=self.HASHES[algorithm](), ) m = Message() + # And here again, cert'ness is irrelevant, so it is stripped out. m.add_string(algorithm.replace("-cert-v01@openssh.com", "")) m.add_string(sig) return m diff --git a/paramiko/server.py b/paramiko/server.py index 6b0bb0f6..6923bdf5 100644 --- a/paramiko/server.py +++ b/paramiko/server.py @@ -254,7 +254,7 @@ class ServerInterface: a valid krb5 principal! We don't check if the krb5 principal is allowed to log in on the server, because there is no way to do that in python. So - if you develop your own SSH server with paramiko for a cetain + if you develop your own SSH server with paramiko for a certain platform like Linux, you should call C{krb5_kuserok()} in your local kerberos library to make sure that the krb5_principal has an account on the server and is allowed to @@ -286,7 +286,7 @@ class ServerInterface: a valid krb5 principal! We don't check if the krb5 principal is allowed to log in on the server, because there is no way to do that in python. So - if you develop your own SSH server with paramiko for a cetain + if you develop your own SSH server with paramiko for a certain platform like Linux, you should call C{krb5_kuserok()} in your local kerberos library to make sure that the krb5_principal has an account on the server and is allowed diff --git a/paramiko/ssh_exception.py b/paramiko/ssh_exception.py index 9b1b44c3..a1e1cfc3 100644 --- a/paramiko/ssh_exception.py +++ b/paramiko/ssh_exception.py @@ -89,6 +89,11 @@ class PartialAuthentication(AuthenticationException): ) +# TODO 4.0: stop inheriting from SSHException, move to auth.py +class UnableToAuthenticate(AuthenticationException): + pass + + class ChannelException(SSHException): """ Exception raised when an attempt to open a new `.Channel` fails. diff --git a/paramiko/transport.py b/paramiko/transport.py index 98cdae03..8785d6bb 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -34,7 +34,7 @@ from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes import paramiko from paramiko import util -from paramiko.auth_handler import AuthHandler +from paramiko.auth_handler import AuthHandler, AuthOnlyHandler from paramiko.ssh_gss import GSSAuth from paramiko.channel import Channel from paramiko.common import ( @@ -64,6 +64,8 @@ from paramiko.common import ( MSG_GLOBAL_REQUEST, MSG_REQUEST_SUCCESS, MSG_REQUEST_FAILURE, + cMSG_SERVICE_REQUEST, + MSG_SERVICE_ACCEPT, MSG_CHANNEL_OPEN_SUCCESS, MSG_CHANNEL_OPEN_FAILURE, MSG_CHANNEL_OPEN, @@ -413,6 +415,8 @@ class Transport(threading.Thread, ClosingContextManager): self.hostname = None self.server_extensions = {} + # TODO: these two overrides on sock's type should go away sometime, too + # many ways to do it! if isinstance(sock, str): # convert "host:port" into (host, port) hl = sock.split(":", 1) @@ -529,6 +533,20 @@ class Transport(threading.Thread, ClosingContextManager): self.server_accept_cv = threading.Condition(self.lock) self.subsystem_table = {} + # Handler table, now set at init time for easier per-instance + # manipulation and subclass twiddling. + self._handler_table = { + MSG_EXT_INFO: self._parse_ext_info, + MSG_NEWKEYS: self._parse_newkeys, + MSG_GLOBAL_REQUEST: self._parse_global_request, + MSG_REQUEST_SUCCESS: self._parse_request_success, + MSG_REQUEST_FAILURE: self._parse_request_failure, + MSG_CHANNEL_OPEN_SUCCESS: self._parse_channel_open_success, + MSG_CHANNEL_OPEN_FAILURE: self._parse_channel_open_failure, + MSG_CHANNEL_OPEN: self._parse_channel_open, + MSG_KEXINIT: self._negotiate_keys, + } + def _filter_algorithm(self, type_): default = getattr(self, "_preferred_{}".format(type_)) return tuple( @@ -1646,7 +1664,7 @@ class Transport(threading.Thread, ClosingContextManager): dumb wrapper around PAM. This method will block until the authentication succeeds or fails, - peroidically calling the handler asynchronously to get answers to + periodically calling the handler asynchronously to get answers to authentication questions. The handler may be called more than once if the server continues to ask questions. @@ -1865,6 +1883,8 @@ class Transport(threading.Thread, ClosingContextManager): # internals... + # TODO 4.0: make a public alias for this because multiple other classes + # already explicitly rely on it...or just rewrite logging :D def _log(self, level, msg, *args): if issubclass(type(msg), list): for m in msg: @@ -2125,6 +2145,8 @@ class Transport(threading.Thread, ClosingContextManager): ) ) # noqa self._expected_packet = tuple() + # These message IDs indicate key exchange & will differ + # depending on exact exchange algorithm if (ptype >= 30) and (ptype <= 41): self.kex_engine.parse_next(ptype, m) continue @@ -2134,7 +2156,7 @@ class Transport(threading.Thread, ClosingContextManager): if error_msg: self._send_message(error_msg) else: - self._handler_table[ptype](self, m) + self._handler_table[ptype](m) elif ptype in self._channel_handler_table: chanid = m.get_int() chan = self._channels.get(chanid) @@ -2160,7 +2182,7 @@ class Transport(threading.Thread, ClosingContextManager): and ptype in self.auth_handler._handler_table ): handler = self.auth_handler._handler_table[ptype] - handler(self.auth_handler, m) + handler(m) if len(self._expected_packet) > 0: continue else: @@ -2981,18 +3003,6 @@ class Transport(threading.Thread, ClosingContextManager): finally: self.lock.release() - _handler_table = { - MSG_EXT_INFO: _parse_ext_info, - MSG_NEWKEYS: _parse_newkeys, - MSG_GLOBAL_REQUEST: _parse_global_request, - MSG_REQUEST_SUCCESS: _parse_request_success, - MSG_REQUEST_FAILURE: _parse_request_failure, - MSG_CHANNEL_OPEN_SUCCESS: _parse_channel_open_success, - MSG_CHANNEL_OPEN_FAILURE: _parse_channel_open_failure, - MSG_CHANNEL_OPEN: _parse_channel_open, - MSG_KEXINIT: _negotiate_keys, - } - _channel_handler_table = { MSG_CHANNEL_SUCCESS: Channel._request_success, MSG_CHANNEL_FAILURE: Channel._request_failed, @@ -3132,3 +3142,161 @@ class ChannelMap: return len(self._map) finally: self._lock.release() + + +class ServiceRequestingTransport(Transport): + """ + Transport, but also handling service requests, like it oughtta! + + .. versionadded:: 3.2 + """ + + # NOTE: this purposefully duplicates some of the parent class in order to + # modernize, refactor, etc. The intent is that eventually we will collapse + # this one onto the parent in a backwards incompatible release. + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._service_userauth_accepted = False + self._handler_table[MSG_SERVICE_ACCEPT] = self._parse_service_accept + + def _parse_service_accept(self, m): + service = m.get_text() + # Short-circuit for any service name not ssh-userauth. + # NOTE: it's technically possible for 'service name' in + # SERVICE_REQUEST/ACCEPT messages to be "ssh-connection" -- + # but I don't see evidence of Paramiko ever initiating or expecting to + # receive one of these. We /do/ see the 'service name' field in + # MSG_USERAUTH_REQUEST/ACCEPT/FAILURE set to this string, but that is a + # different set of handlers, so...! + if service != "ssh-userauth": + # TODO 4.0: consider erroring here (with an ability to opt out?) + # instead as it probably means something went Very Wrong. + self._log( + DEBUG, 'Service request "{}" accepted (?)'.format(service) + ) + return + # Record that we saw a service-userauth acceptance, meaning we are free + # to submit auth requests. + self._service_userauth_accepted = True + self._log(DEBUG, "MSG_SERVICE_ACCEPT received; auth may begin") + + def ensure_session(self): + # Make sure we're not trying to auth on a not-yet-open or + # already-closed transport session; that's our responsibility, not that + # of AuthHandler. + if (not self.active) or (not self.initial_kex_done): + # TODO: better error message? this can happen in many places, eg + # user error (authing before connecting) or developer error (some + # improperly handled pre/mid auth shutdown didn't become fatal + # enough). The latter is much more common & should ideally be fixed + # by terminating things harder? + raise SSHException("No existing session") + # Also make sure we've actually been told we are allowed to auth. + if self._service_userauth_accepted: + return + # Or request to do so, otherwise. + m = Message() + m.add_byte(cMSG_SERVICE_REQUEST) + m.add_string("ssh-userauth") + self._log(DEBUG, "Sending MSG_SERVICE_REQUEST: ssh-userauth") + self._send_message(m) + # Now we wait to hear back; the user is expecting a blocking-style auth + # request so there's no point giving control back anywhere. + while not self._service_userauth_accepted: + # TODO: feels like we're missing an AuthHandler Event like + # 'self.auth_event' which is set when AuthHandler shuts down in + # ways good AND bad. Transport only seems to have completion_event + # which is unclear re: intent, eg it's set by newkeys which always + # happens on connection, so it'll always be set by the time we get + # here. + # NOTE: this copies the timing of event.wait() in + # AuthHandler.wait_for_response, re: 1/10 of a second. Could + # presumably be smaller, but seems unlikely this period is going to + # be "too long" for any code doing ssh networking... + time.sleep(0.1) + self.auth_handler = self.get_auth_handler() + + def get_auth_handler(self): + # NOTE: using new sibling subclass instead of classic AuthHandler + return AuthOnlyHandler(self) + + def auth_none(self, username): + # TODO 4.0: merge to parent, preserving (most of) docstring + self.ensure_session() + return self.auth_handler.auth_none(username) + + def auth_password(self, username, password, fallback=True): + # TODO 4.0: merge to parent, preserving (most of) docstring + self.ensure_session() + try: + return self.auth_handler.auth_password(username, password) + except BadAuthenticationType as e: + # if password auth isn't allowed, but keyboard-interactive *is*, + # try to fudge it + if not fallback or ("keyboard-interactive" not in e.allowed_types): + raise + try: + + def handler(title, instructions, fields): + if len(fields) > 1: + raise SSHException("Fallback authentication failed.") + if len(fields) == 0: + # for some reason, at least on os x, a 2nd request will + # be made with zero fields requested. maybe it's just + # to try to fake out automated scripting of the exact + # type we're doing here. *shrug* :) + return [] + return [password] + + return self.auth_interactive(username, handler) + except SSHException: + # attempt to fudge failed; just raise the original exception + raise e + + def auth_publickey(self, username, key): + # TODO 4.0: merge to parent, preserving (most of) docstring + self.ensure_session() + return self.auth_handler.auth_publickey(username, key) + + def auth_interactive(self, username, handler, submethods=""): + # TODO 4.0: merge to parent, preserving (most of) docstring + self.ensure_session() + return self.auth_handler.auth_interactive( + username, handler, submethods + ) + + def auth_interactive_dumb(self, username, handler=None, submethods=""): + # TODO 4.0: merge to parent, preserving (most of) docstring + # NOTE: legacy impl omitted equiv of ensure_session since it just wraps + # another call to an auth method. however we reinstate it for + # consistency reasons. + self.ensure_session() + if not handler: + + def handler(title, instructions, prompt_list): + answers = [] + if title: + print(title.strip()) + if instructions: + print(instructions.strip()) + for prompt, show_input in prompt_list: + print(prompt.strip(), end=" ") + answers.append(input()) + return answers + + return self.auth_interactive(username, handler, submethods) + + def auth_gssapi_with_mic(self, username, gss_host, gss_deleg_creds): + # TODO 4.0: merge to parent, preserving (most of) docstring + self.ensure_session() + self.auth_handler = self.get_auth_handler() + return self.auth_handler.auth_gssapi_with_mic( + username, gss_host, gss_deleg_creds + ) + + def auth_gssapi_keyex(self, username): + # TODO 4.0: merge to parent, preserving (most of) docstring + self.ensure_session() + self.auth_handler = self.get_auth_handler() + return self.auth_handler.auth_gssapi_keyex(username) @@ -1,6 +1,3 @@ [pytest] -# We use pytest-relaxed just for its utils at the moment, so disable it at the -# plugin level until we adapt test organization to really use it. -addopts = -p no:relaxed -# Loop on failure -looponfailroots = tests paramiko +testpaths = tests +python_files = * diff --git a/sites/docs/api/auth.rst b/sites/docs/api/auth.rst new file mode 100644 index 00000000..b6bce36c --- /dev/null +++ b/sites/docs/api/auth.rst @@ -0,0 +1,8 @@ +Authentication modules +====================== + +.. automodule:: paramiko.auth_strategy + :member-order: bysource + +.. automodule:: paramiko.auth_handler + :member-order: bysource diff --git a/sites/docs/index.rst b/sites/docs/index.rst index 87265d95..675fe596 100644 --- a/sites/docs/index.rst +++ b/sites/docs/index.rst @@ -47,6 +47,7 @@ Authentication & keys --------------------- .. toctree:: + api/auth api/agent api/hostkeys api/keys diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst index 0b2022ca..c18890eb 100644 --- a/sites/www/changelog.rst +++ b/sites/www/changelog.rst @@ -2,6 +2,132 @@ Changelog ========= +- :release:`3.2.0 <2023-05-25>` +- :bug:`- major` Fixed a very sneaky bug found at the apparently + rarely-traveled intersection of ``RSA-SHA2`` keys, certificates, SSH agents, + and stricter-than-OpenSSH server targets. This manifested as yet another + "well, if we turn off SHA2 at one end or another, everything works again" + problem, for example with version 12 of the Teleport server endpoint. + + This has been fixed; Paramiko tweaked multiple aspects of how it requests + agent signatures, and the agent appears to do the right thing now. + + Thanks to Ryan Stoner for the bug report and testing. +- :bug:`2012 major` (also :issue:`1961` and countless others) The + ``server-sig-algs`` and ``RSA-SHA2`` features added around Paramiko 2.9 or + so, had the annoying side effect of not working with servers that don't + support *either* of those feature sets, requiring use of + ``disabled_algorithms`` to forcibly disable the SHA2 algorithms on Paramiko's + end. + + The **experimental** `~paramiko.transport.ServiceRequestingTransport` (noted + in its own entry in this changelog) includes a fix for this issue, + specifically by falling back to the same algorithm as the in-use pubkey if + it's in the algorithm list (leaving the "first algorithm in said list" as an + absolute final fallback). +- :feature:`-` Implement ``_fields()`` on `~paramiko.agent.AgentKey` so that it + may be compared (via ``==``) with other `~paramiko.pkey.PKey` instances. +- :bug:`23 major` Since its inception, Paramiko has (for reasons lost to time) + implemented authentication as a side effect of handling affirmative replies + to ``MSG_SERVICE_REQUEST`` protocol messages. What this means is Paramiko + makes one such request before every ``MSG_USERAUTH_REQUEST``, i.e. every auth + attempt. + + OpenSSH doesn't care if clients send multiple service requests, but other + server implementations are often stricter in what they accept after an + initial service request (due to the RFCs not being clear). This can result in + odd behavior when a user doesn't authenticate successfully on the very first + try (for example, when the right key for a target host is the third in one's + ssh-agent). + + This version of Paramiko now contains an opt-in + `~paramiko.transport.Transport` subclass, + `~paramiko.transport.ServiceRequestingTransport`, which more-correctly + implements service request handling in the Transport, and uses an + auth-handler subclass internally which has been similarly adapted. Users + wanting to try this new experimental code path may hand this class to + `SSHClient.connect <paramiko.client.SSHClient.connect>` as its + ``transport_factory`` kwarg. + + .. warning:: + This feature is **EXPERIMENTAL** and its code may be subject to change. + + In addition: + - minor backwards incompatible changes exist in the new code paths, + most notably the removal of the (inconsistently applied and rarely + used) ``event`` arguments to the ``auth_xxx`` methods. + - GSSAPI support has only been partially implemented, and is untested. + + .. note:: + Some minor backwards-*compatible* changes were made to the **existing** + Transport and AuthHandler classes to facilitate the new code. For + example, ``Transport._handler_table`` and + ``AuthHandler._client_handler_table`` are now properties instead of raw + attributes. + +- :feature:`387` Users of `~paramiko.client.SSHClient` can now configure the + authentication logic Paramiko uses when connecting to servers; this + functionality is intended for advanced users and higher-level libraries such + as `Fabric <https://fabfile.org>`_. See `~paramiko.auth_strategy` for + details. + + Fabric's co-temporal release includes a proof-of-concept use of this feature, + implementing an auth flow much closer to that of the OpenSSH client (versus + Paramiko's legacy behavior). It is **strongly recommended** that if this + interests you, investigate replacing any direct use of ``SSHClient`` with + Fabric's ``Connection``. + + .. warning:: + This feature is **EXPERIMENTAL**; please see its docs for details. + +- :feature:`-` Enhanced `~paramiko.agent.AgentKey` with new attributes, such + as: + + - Added a ``comment`` attribute (and constructor argument); + `Agent.get_keys() <paramiko.agent.Agent.get_keys>` now uses this kwarg to + store any comment field sent over by the agent. The original version of + the agent feature inexplicably did not store the comment anywhere. + - Agent-derived keys now attempt to instantiate a copy of the appropriate + key class for access to other algorithm-specific members (eg key size). + This is available as the ``.inner_key`` attribute. + + .. note:: + This functionality is now in use in Fabric's new ``--list-agent-keys`` + feature, as well as in Paramiko's debug logging. + +- :feature:`-` `~paramiko.pkey.PKey` now offers convenience + "meta-constructors", static methods that simplify the process of + instantiating the correct subclass for a given key input. + + For example, `PKey.from_path <paramiko.pkey.PKey.from_path>` can load a file + path without knowing *a priori* what type of key it is (thanks to some handy + methods within our cryptography dependency). Going forwards, we expect this + to be the primary method of loading keys by user code that runs on "human + time" (i.e. where some minor efficiencies are worth the convenience). + + In addition, `PKey.from_type_string <paramiko.pkey.PKey.from_type_string>` + now exists, and is being used in some internals to load ssh-agent keys. + + As part of these changes, `~paramiko.pkey.PKey` and friends grew an + `~paramiko.pkey.PKey.identifiers` classmethod; this is inspired by the + `~paramiko.ecdsakey.ECDSAKey.supported_key_format_identifiers` classmethod + (which now refers to the new method.) This also includes adding a ``.name`` + attribute to most key classes (which will eventually replace ``.get_name()``. + +- :feature:`-` `~paramiko.pkey.PKey` grew a new ``.algorithm_name`` property + which displays the key algorithm; this is typically derived from the value of + `~paramiko.pkey.PKey.get_name`. For example, ED25519 keys have a ``get_name`` + of ``ssh-ed25519`` (the SSH protocol key type field value), and now have a + ``algorithm_name`` of ``ED25519``. +- :feature:`-` `~paramiko.pkey.PKey` grew a new ``.fingerprint`` property which + emits a fingerprint string matching the SHA256+Base64 values printed by + various OpenSSH tooling (eg ``ssh-add -l``, ``ssh -v``). This is intended to + help troubleshoot Paramiko-vs-OpenSSH behavior and will eventually replace + the venerable ``get_fingerprint`` method. +- :bug:`- major` `~paramiko.agent.AgentKey` had a dangling Python 3 + incompatible ``__str__`` method returning bytes. This method has been + removed, allowing the superclass' (`~paramiko.pkey.PKey`) method to run + instead. - :release:`3.1.0 <2023-03-10>` - :feature:`2013` (solving :issue:`2009`, plus others) Add an explicit ``channel_timeout`` keyword argument to `paramiko.client.SSHClient.connect`, @@ -1,10 +1,11 @@ import os +from pathlib import Path from os.path import join from shutil import rmtree, copytree from invoke import Collection, task -from invocations.checks import blacken -from invocations.docs import docs, www, sites +from invocations import checks +from invocations.docs import docs, www, sites, watch_docs from invocations.packaging.release import ns as release_coll, publish from invocations.testing import count_errors @@ -50,8 +51,10 @@ def test( opts += " -f" modstr = "" if module is not None: - # NOTE: implicit test_ prefix as we're not on pytest-relaxed yet - modstr = " tests/test_{}.py".format(module) + base = f"{module}.py" + tests = Path("tests") + legacy = tests / f"test_{base}" + modstr = str(legacy if legacy.exists() else tests / base) # Switch runner depending on coverage or no coverage. # TODO: get pytest's coverage plugin working, IIRC it has issues? runner = "pytest" @@ -98,7 +101,7 @@ def guard(ctx, opts=""): # projects do it. @task def publish_( - ctx, sdist=True, wheel=True, sign=True, dry_run=False, index=None + ctx, sdist=True, wheel=True, sign=False, dry_run=False, index=None ): """ Wraps invocations.packaging.publish to add baked-in docs folder. @@ -137,9 +140,11 @@ ns = Collection( release_coll, docs, www, + watch_docs, sites, count_errors, - blacken, + checks.blacken, + checks, ) ns.configure( { @@ -147,11 +152,12 @@ ns.configure( # NOTE: many of these are also set in kwarg defaults above; but # having them here too means once we get rid of our custom # release(), the behavior stays. - "sign": True, + "sign": False, "wheel": True, "changelog_file": join( www.configuration()["sphinx"]["source"], "changelog.rst" ), - } + }, + "docs": {"browse": "remote"}, } ) diff --git a/tests/loop.py b/tests/_loop.py index a3740013..a3740013 100644 --- a/tests/loop.py +++ b/tests/_loop.py diff --git a/tests/stub_sftp.py b/tests/_stub_sftp.py index 0c0372e9..0c0372e9 100644 --- a/tests/stub_sftp.py +++ b/tests/_stub_sftp.py diff --git a/tests/cert_support/test_dss.key b/tests/_support/dss.key index e10807f1..e10807f1 100644 --- a/tests/cert_support/test_dss.key +++ b/tests/_support/dss.key diff --git a/tests/cert_support/test_dss.key-cert.pub b/tests/_support/dss.key-cert.pub index 07fd5578..07fd5578 100644 --- a/tests/cert_support/test_dss.key-cert.pub +++ b/tests/_support/dss.key-cert.pub diff --git a/tests/cert_support/test_ecdsa_256.key b/tests/_support/ecdsa-256.key index 42d44734..42d44734 100644 --- a/tests/cert_support/test_ecdsa_256.key +++ b/tests/_support/ecdsa-256.key diff --git a/tests/cert_support/test_ecdsa_256.key-cert.pub b/tests/_support/ecdsa-256.key-cert.pub index f2c93ccf..f2c93ccf 100644 --- a/tests/cert_support/test_ecdsa_256.key-cert.pub +++ b/tests/_support/ecdsa-256.key-cert.pub diff --git a/tests/cert_support/test_ed25519.key b/tests/_support/ed25519.key index eb9f94c2..eb9f94c2 100644 --- a/tests/cert_support/test_ed25519.key +++ b/tests/_support/ed25519.key diff --git a/tests/cert_support/test_ed25519.key-cert.pub b/tests/_support/ed25519.key-cert.pub index 4e01415a..4e01415a 100644 --- a/tests/cert_support/test_ed25519.key-cert.pub +++ b/tests/_support/ed25519.key-cert.pub diff --git a/tests/_support/ed448.key b/tests/_support/ed448.key new file mode 100644 index 00000000..887b51c6 --- /dev/null +++ b/tests/_support/ed448.key @@ -0,0 +1,4 @@ +-----BEGIN PRIVATE KEY----- +MEcCAQAwBQYDK2VxBDsEOcvcl9IoD0ktR5RWtW84NM7O2e4LmD2cWfRg7Wht/OA9 +POkmRW12VNvlP6BsXKir5yygumIjD91SQQ== +-----END PRIVATE KEY----- diff --git a/tests/cert_support/test_rsa.key b/tests/_support/rsa-lonely.key index f50e9c53..f50e9c53 100644 --- a/tests/cert_support/test_rsa.key +++ b/tests/_support/rsa-lonely.key diff --git a/tests/cert_support/test_rsa.key-cert.pub b/tests/_support/rsa-missing.key-cert.pub index 7487ab66..7487ab66 100644 --- a/tests/cert_support/test_rsa.key-cert.pub +++ b/tests/_support/rsa-missing.key-cert.pub diff --git a/tests/test_rsa.key b/tests/_support/rsa.key index f50e9c53..f50e9c53 100644 --- a/tests/test_rsa.key +++ b/tests/_support/rsa.key diff --git a/tests/_support/rsa.key-cert.pub b/tests/_support/rsa.key-cert.pub new file mode 100644 index 00000000..7487ab66 --- /dev/null +++ b/tests/_support/rsa.key-cert.pub @@ -0,0 +1 @@ +ssh-rsa-cert-v01@openssh.com AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgsZlXTd5NE4uzGAn6TyAqQj+IPbsTEFGap2x5pTRwQR8AAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4cAAAAAAAAE0gAAAAEAAAAmU2FtcGxlIHNlbGYtc2lnbmVkIE9wZW5TU0ggY2VydGlmaWNhdGUAAAASAAAABXVzZXIxAAAABXVzZXIyAAAAAAAAAAD//////////wAAAAAAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZGluZwAAAAAAAAAXcGVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pdC1wb3J0LWZvcndhcmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1pdC11c2VyLXJjAAAAAAAAAAAAAACVAAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4cAAACPAAAAB3NzaC1yc2EAAACATFHFsARDgQevc6YLxNnDNjsFtZ08KPMyYVx0w5xm95IVZHVWSOc5w+ccjqN9HRwxV3kP7IvL91qx0Uc3MJdB9g/O6HkAP+rpxTVoTb2EAMekwp5+i8nQJW4CN2BSsbQY1M6r7OBZ5nmF4hOW/5Pu4l22lXe2ydy8kEXOEuRpUeQ= test_rsa.key.pub diff --git a/tests/_util.py b/tests/_util.py new file mode 100644 index 00000000..eaf6aac4 --- /dev/null +++ b/tests/_util.py @@ -0,0 +1,441 @@ +from contextlib import contextmanager +from os.path import dirname, realpath, join +import builtins +import os +from pathlib import Path +import socket +import struct +import sys +import unittest +from time import sleep +import threading + +import pytest + +from paramiko import ( + ServerInterface, + RSAKey, + DSSKey, + AUTH_FAILED, + AUTH_PARTIALLY_SUCCESSFUL, + AUTH_SUCCESSFUL, + OPEN_SUCCEEDED, + OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED, + InteractiveQuery, + Transport, +) +from paramiko.ssh_gss import GSS_AUTH_AVAILABLE + +from cryptography.exceptions import UnsupportedAlgorithm, _Reasons +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding, rsa + +tests_dir = dirname(realpath(__file__)) + +from ._loop import LoopSocket + + +def _support(filename): + base = Path(tests_dir) + top = base / filename + deeper = base / "_support" / filename + return str(deeper if deeper.exists() else top) + + +def _config(name): + return join(tests_dir, "configs", name) + + +needs_gssapi = pytest.mark.skipif( + not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test" +) + + +def needs_builtin(name): + """ + Skip decorated test if builtin name does not exist. + """ + reason = "Test requires a builtin '{}'".format(name) + return pytest.mark.skipif(not hasattr(builtins, name), reason=reason) + + +slow = pytest.mark.slow + +# GSSAPI / Kerberos related tests need a working Kerberos environment. +# The class `KerberosTestCase` provides such an environment or skips all tests. +# There are 3 distinct cases: +# +# - A Kerberos environment has already been created and the environment +# contains the required information. +# +# - We can use the package 'k5test' to setup an working kerberos environment on +# the fly. +# +# - We skip all tests. +# +# ToDo: add a Windows specific implementation? + +if ( + os.environ.get("K5TEST_USER_PRINC", None) + and os.environ.get("K5TEST_HOSTNAME", None) + and os.environ.get("KRB5_KTNAME", None) +): # add other vars as needed + + # The environment provides the required information + class DummyK5Realm: + def __init__(self): + for k in os.environ: + if not k.startswith("K5TEST_"): + continue + setattr(self, k[7:].lower(), os.environ[k]) + self.env = {} + + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.realm = DummyK5Realm() + + @classmethod + def tearDownClass(cls): + del cls.realm + +else: + try: + # Try to setup a kerberos environment + from k5test import KerberosTestCase + except Exception: + # Use a dummy, that skips all tests + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + raise unittest.SkipTest( + "Missing extension package k5test. " + 'Please run "pip install k5test" ' + "to install it." + ) + + +def update_env(testcase, mapping, env=os.environ): + """Modify os.environ during a test case and restore during cleanup.""" + saved_env = env.copy() + + def replace(target, source): + target.update(source) + for k in list(target): + if k not in source: + target.pop(k, None) + + testcase.addCleanup(replace, env, saved_env) + env.update(mapping) + return testcase + + +def k5shell(args=None): + """Create a shell with an kerberos environment + + This can be used to debug paramiko or to test the old GSSAPI. + To test a different GSSAPI, simply activate a suitable venv + within the shell. + """ + import k5test + import atexit + import subprocess + + k5 = k5test.K5Realm() + atexit.register(k5.stop) + os.environ.update(k5.env) + for n in ("realm", "user_princ", "hostname"): + os.environ["K5TEST_" + n.upper()] = getattr(k5, n) + + if not args: + args = sys.argv[1:] + if not args: + args = [os.environ.get("SHELL", "bash")] + sys.exit(subprocess.call(args)) + + +def is_low_entropy(): + """ + Attempts to detect whether running interpreter is low-entropy. + + "low-entropy" is defined as being in 32-bit mode and with the hash seed set + to zero. + """ + is_32bit = struct.calcsize("P") == 32 / 8 + # I don't see a way to tell internally if the hash seed was set this + # way, but env should be plenty sufficient, this is only for testing. + return is_32bit and os.environ.get("PYTHONHASHSEED", None) == "0" + + +def sha1_signing_unsupported(): + """ + This is used to skip tests in environments where SHA-1 signing is + not supported by the backend. + """ + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + message = b"Some dummy text" + try: + private_key.sign( + message, + padding.PSS( + mgf=padding.MGF1(hashes.SHA1()), + salt_length=padding.PSS.MAX_LENGTH, + ), + hashes.SHA1(), + ) + return False + except UnsupportedAlgorithm as e: + return e._reason is _Reasons.UNSUPPORTED_HASH + + +requires_sha1_signing = unittest.skipIf( + sha1_signing_unsupported(), "SHA-1 signing not supported" +) + +_disable_sha2 = dict( + disabled_algorithms=dict(keys=["rsa-sha2-256", "rsa-sha2-512"]) +) +_disable_sha1 = dict(disabled_algorithms=dict(keys=["ssh-rsa"])) +_disable_sha2_pubkey = dict( + disabled_algorithms=dict(pubkeys=["rsa-sha2-256", "rsa-sha2-512"]) +) +_disable_sha1_pubkey = dict(disabled_algorithms=dict(pubkeys=["ssh-rsa"])) + + +unicodey = "\u2022" + + +class TestServer(ServerInterface): + paranoid_did_password = False + paranoid_did_public_key = False + # TODO: make this ed25519 or something else modern? (_is_ this used??) + paranoid_key = DSSKey.from_private_key_file(_support("dss.key")) + + def __init__(self, allowed_keys=None): + self.allowed_keys = allowed_keys if allowed_keys is not None else [] + + def check_channel_request(self, kind, chanid): + if kind == "bogus": + return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED + return OPEN_SUCCEEDED + + def check_channel_exec_request(self, channel, command): + if command != b"yes": + return False + return True + + def check_channel_shell_request(self, channel): + return True + + def check_global_request(self, kind, msg): + self._global_request = kind + # NOTE: for w/e reason, older impl of this returned False always, even + # tho that's only supposed to occur if the request cannot be served. + # For now, leaving that the default unless test supplies specific + # 'acceptable' request kind + return kind == "acceptable" + + def check_channel_x11_request( + self, + channel, + single_connection, + auth_protocol, + auth_cookie, + screen_number, + ): + self._x11_single_connection = single_connection + self._x11_auth_protocol = auth_protocol + self._x11_auth_cookie = auth_cookie + self._x11_screen_number = screen_number + return True + + def check_port_forward_request(self, addr, port): + self._listen = socket.socket() + self._listen.bind(("127.0.0.1", 0)) + self._listen.listen(1) + return self._listen.getsockname()[1] + + def cancel_port_forward_request(self, addr, port): + self._listen.close() + self._listen = None + + def check_channel_direct_tcpip_request(self, chanid, origin, destination): + self._tcpip_dest = destination + return OPEN_SUCCEEDED + + def get_allowed_auths(self, username): + if username == "slowdive": + return "publickey,password" + if username == "paranoid": + if ( + not self.paranoid_did_password + and not self.paranoid_did_public_key + ): + return "publickey,password" + elif self.paranoid_did_password: + return "publickey" + else: + return "password" + if username == "commie": + return "keyboard-interactive" + if username == "utf8": + return "password" + if username == "non-utf8": + return "password" + return "publickey" + + def check_auth_password(self, username, password): + if (username == "slowdive") and (password == "pygmalion"): + return AUTH_SUCCESSFUL + if (username == "paranoid") and (password == "paranoid"): + # 2-part auth (even openssh doesn't support this) + self.paranoid_did_password = True + if self.paranoid_did_public_key: + return AUTH_SUCCESSFUL + return AUTH_PARTIALLY_SUCCESSFUL + if (username == "utf8") and (password == unicodey): + return AUTH_SUCCESSFUL + if (username == "non-utf8") and (password == "\xff"): + return AUTH_SUCCESSFUL + if username == "bad-server": + raise Exception("Ack!") + if username == "unresponsive-server": + sleep(5) + return AUTH_SUCCESSFUL + return AUTH_FAILED + + def check_auth_publickey(self, username, key): + if (username == "paranoid") and (key == self.paranoid_key): + # 2-part auth + self.paranoid_did_public_key = True + if self.paranoid_did_password: + return AUTH_SUCCESSFUL + return AUTH_PARTIALLY_SUCCESSFUL + # TODO: make sure all tests incidentally using this to pass, _without + # sending a username oops_, get updated somehow - probably via server() + # default always injecting a username + elif key in self.allowed_keys: + return AUTH_SUCCESSFUL + return AUTH_FAILED + + def check_auth_interactive(self, username, submethods): + if username == "commie": + self.username = username + return InteractiveQuery( + "password", "Please enter a password.", ("Password", False) + ) + return AUTH_FAILED + + def check_auth_interactive_response(self, responses): + if self.username == "commie": + if (len(responses) == 1) and (responses[0] == "cat"): + return AUTH_SUCCESSFUL + return AUTH_FAILED + + +@contextmanager +def server( + hostkey=None, + init=None, + server_init=None, + client_init=None, + connect=None, + pubkeys=None, + catch_error=False, + transport_factory=None, + defer=False, + skip_verify=False, +): + """ + SSH server contextmanager for testing. + + :param hostkey: + Host key to use for the server; if None, loads + ``rsa.key``. + :param init: + Default `Transport` constructor kwargs to use for both sides. + :param server_init: + Extends and/or overrides ``init`` for server transport only. + :param client_init: + Extends and/or overrides ``init`` for client transport only. + :param connect: + Kwargs to use for ``connect()`` on the client. + :param pubkeys: + List of public keys for auth. + :param catch_error: + Whether to capture connection errors & yield from contextmanager. + Necessary for connection_time exception testing. + :param transport_factory: + Like the same-named param in SSHClient: which Transport class to use. + :param bool defer: + Whether to defer authentication during connecting. + + This is really just shorthand for ``connect={}`` which would do roughly + the same thing. Also: this implies skip_verify=True automatically! + :param bool skip_verify: + Whether NOT to do the default "make sure auth passed" check. + """ + if init is None: + init = {} + if server_init is None: + server_init = {} + if client_init is None: + client_init = {} + if connect is None: + # No auth at all please + if defer: + connect = dict() + # Default username based auth + else: + connect = dict(username="slowdive", password="pygmalion") + socks = LoopSocket() + sockc = LoopSocket() + sockc.link(socks) + if transport_factory is None: + transport_factory = Transport + tc = transport_factory(sockc, **dict(init, **client_init)) + ts = transport_factory(socks, **dict(init, **server_init)) + + if hostkey is None: + hostkey = RSAKey.from_private_key_file(_support("rsa.key")) + ts.add_server_key(hostkey) + event = threading.Event() + server = TestServer(allowed_keys=pubkeys) + assert not event.is_set() + assert not ts.is_active() + assert tc.get_username() is None + assert ts.get_username() is None + assert not tc.is_authenticated() + assert not ts.is_authenticated() + + err = None + # Trap errors and yield instead of raising right away; otherwise callers + # cannot usefully deal with problems at connect time which stem from errors + # in the server side. + try: + ts.start_server(event, server) + tc.connect(**connect) + + event.wait(1.0) + assert event.is_set() + assert ts.is_active() + assert tc.is_active() + + except Exception as e: + if not catch_error: + raise + err = e + + yield (tc, ts, err) if catch_error else (tc, ts) + + if not (catch_error or skip_verify): + assert ts.is_authenticated() + assert tc.is_authenticated() + + tc.close() + ts.close() + socks.close() + sockc.close() diff --git a/tests/agent.py b/tests/agent.py new file mode 100644 index 00000000..bcbfb216 --- /dev/null +++ b/tests/agent.py @@ -0,0 +1,151 @@ +from unittest.mock import Mock + +from pytest import mark, raises + +from paramiko import AgentKey, Message, RSAKey +from paramiko.agent import ( + SSH2_AGENT_SIGN_RESPONSE, + SSH_AGENT_RSA_SHA2_256, + SSH_AGENT_RSA_SHA2_512, + cSSH2_AGENTC_SIGN_REQUEST, +) + +from ._util import _support + + +# AgentKey with no inner_key +class _BareAgentKey(AgentKey): + def __init__(self, name, blob): + self.name = name + self.blob = blob + self.inner_key = None + + +class AgentKey_: + def str_is_repr(self): + # Tests for a missed spot in Python 3 upgrades: AgentKey.__str__ was + # returning bytes, as if under Python 2. When bug present, this + # explodes with "__str__ returned non-string". + key = AgentKey(None, b"secret!!!") + assert str(key) == repr(key) + + class init: + def needs_at_least_two_arguments(self): + with raises(TypeError): + AgentKey() + with raises(TypeError): + AgentKey(None) + + def sets_attributes_and_parses_blob(self): + agent = Mock() + blob = Message() + blob.add_string("bad-type") + key = AgentKey(agent=agent, blob=bytes(blob)) + assert key.agent is agent + assert key.name == "bad-type" + assert key.blob == bytes(blob) + assert key.comment == "" # default + # TODO: logger testing + assert key.inner_key is None # no 'bad-type' algorithm + + def comment_optional(self): + blob = Message() + blob.add_string("bad-type") + key = AgentKey(agent=Mock(), blob=bytes(blob), comment="hi!") + assert key.comment == "hi!" + + def sets_inner_key_when_known_type(self, keys): + key = AgentKey(agent=Mock(), blob=bytes(keys.pkey)) + assert key.inner_key == keys.pkey + + class fields: + def defaults_to_get_name_and_blob(self): + key = _BareAgentKey(name="lol", blob=b"lmao") + assert key._fields == ["lol", b"lmao"] + + # TODO: pytest-relaxed is buggy (now?), this shows up under get_bits? + def defers_to_inner_key_when_present(self, keys): + key = AgentKey(agent=None, blob=keys.pkey.asbytes()) + assert key._fields == keys.pkey._fields + assert key == keys.pkey + + class get_bits: + def defaults_to_superclass_implementation(self): + # TODO 4.0: assert raises NotImplementedError like changed parent? + assert _BareAgentKey(None, None).get_bits() == 0 + + def defers_to_inner_key_when_present(self, keys): + key = AgentKey(agent=None, blob=keys.pkey.asbytes()) + assert key.get_bits() == keys.pkey.get_bits() + + class asbytes: + def defaults_to_owned_blob(self): + blob = Mock() + assert _BareAgentKey(name=None, blob=blob).asbytes() is blob + + def defers_to_inner_key_when_present(self, keys): + key = AgentKey(agent=None, blob=keys.pkey_with_cert.asbytes()) + # Artificially make outer key blob != inner key blob; comment in + # AgentKey.asbytes implies this can sometimes really happen but I + # no longer recall when that could be? + key.blob = b"nope" + assert key.asbytes() == key.inner_key.asbytes() + + @mark.parametrize( + "sign_kwargs,expected_flag", + [ + # No algorithm kwarg: no flags (bitfield -> 0 int) + (dict(), 0), + (dict(algorithm="rsa-sha2-256"), SSH_AGENT_RSA_SHA2_256), + (dict(algorithm="rsa-sha2-512"), SSH_AGENT_RSA_SHA2_512), + # TODO: ideally we only send these when key is a cert, + # but it doesn't actually break when not; meh. Really just wants + # all the parameterization of this test rethought. + ( + dict(algorithm="rsa-sha2-256-cert-v01@openssh.com"), + SSH_AGENT_RSA_SHA2_256, + ), + ( + dict(algorithm="rsa-sha2-512-cert-v01@openssh.com"), + SSH_AGENT_RSA_SHA2_512, + ), + ], + ) + def signing_data(self, sign_kwargs, expected_flag): + class FakeAgent: + def _send_message(self, msg): + # The thing we actually care most about, we're not testing + # ssh-agent itself here + self._sent_message = msg + sig = Message() + sig.add_string("lol") + sig.rewind() + return SSH2_AGENT_SIGN_RESPONSE, sig + + for do_cert in (False, True): + agent = FakeAgent() + # Get key kinda like how a real agent would give it to us - if + # cert, it'd be the entire public blob, not just the pubkey. This + # ensures the code under test sends _just the pubkey part_ back to + # the agent during signature requests (bug was us sending _the + # entire cert blob_, which somehow "worked ok" but always got us + # SHA1) + # NOTE: using lower level loader to avoid auto-cert-load when + # testing regular key (agents expose them separately) + inner_key = RSAKey.from_private_key_file(_support("rsa.key")) + blobby = inner_key.asbytes() + # NOTE: expected key blob always wants to be the real key, even + # when the "key" is a certificate. + expected_request_key_blob = blobby + if do_cert: + inner_key.load_certificate(_support("rsa.key-cert.pub")) + blobby = inner_key.public_blob.key_blob + key = AgentKey(agent, blobby) + result = key.sign_ssh_data(b"data-to-sign", **sign_kwargs) + assert result == b"lol" + msg = agent._sent_message + msg.rewind() + assert msg.get_byte() == cSSH2_AGENTC_SIGN_REQUEST + assert msg.get_string() == expected_request_key_blob + assert msg.get_string() == b"data-to-sign" + assert msg.get_int() == expected_flag diff --git a/tests/auth.py b/tests/auth.py new file mode 100644 index 00000000..c0afe889 --- /dev/null +++ b/tests/auth.py @@ -0,0 +1,580 @@ +""" +Tests focusing primarily on the authentication step. + +Thus, they concern AuthHandler and AuthStrategy, with a side of Transport. +""" + +from logging import Logger +from unittest.mock import Mock + +from pytest import raises + +from paramiko import ( + AgentKey, + AuthenticationException, + AuthFailure, + AuthResult, + AuthSource, + AuthStrategy, + BadAuthenticationType, + DSSKey, + InMemoryPrivateKey, + NoneAuth, + OnDiskPrivateKey, + Password, + PrivateKey, + PKey, + RSAKey, + SSHException, + ServiceRequestingTransport, + SourceResult, +) + +from ._util import ( + _disable_sha1_pubkey, + _disable_sha2, + _disable_sha2_pubkey, + _support, + requires_sha1_signing, + server, + unicodey, +) + + +class AuthHandler_: + """ + Most of these tests are explicit about the auth method they call. + + This is because not too many other tests do so (they rely on the implicit + auth trigger of various connect() kwargs). + """ + + def bad_auth_type(self): + """ + verify that we get the right exception when an unsupported auth + type is requested. + """ + # Server won't allow password auth for this user, so should fail + # and return just publickey allowed types + with server( + connect=dict(username="unknown", password="error"), + catch_error=True, + ) as (_, _, err): + assert isinstance(err, BadAuthenticationType) + assert err.allowed_types == ["publickey"] + + def bad_password(self): + """ + verify that a bad password gets the right exception, and that a retry + with the right password works. + """ + # NOTE: Transport.connect doesn't do any auth upfront if no userauth + # related kwargs given. + with server(defer=True) as (tc, ts): + # Auth once, badly + with raises(AuthenticationException): + tc.auth_password(username="slowdive", password="error") + # And again, correctly + tc.auth_password(username="slowdive", password="pygmalion") + + def multipart_auth(self): + """ + verify that multipart auth works. + """ + with server(defer=True) as (tc, ts): + assert tc.auth_password( + username="paranoid", password="paranoid" + ) == ["publickey"] + key = DSSKey.from_private_key_file(_support("dss.key")) + assert tc.auth_publickey(username="paranoid", key=key) == [] + + def interactive_auth(self): + """ + verify keyboard-interactive auth works. + """ + + def handler(title, instructions, prompts): + self.got_title = title + self.got_instructions = instructions + self.got_prompts = prompts + return ["cat"] + + with server(defer=True) as (tc, ts): + assert tc.auth_interactive("commie", handler) == [] + assert self.got_title == "password" + assert self.got_prompts == [("Password", False)] + + def interactive_fallback(self): + """ + verify that a password auth attempt will fallback to "interactive" + if password auth isn't supported but interactive is. + """ + with server(defer=True) as (tc, ts): + # This username results in an allowed_auth of just kbd-int, + # and has a configured interactive->response on the server. + assert tc.auth_password("commie", "cat") == [] + + def utf8(self): + """ + verify that utf-8 encoding happens in authentication. + """ + with server(defer=True) as (tc, ts): + assert tc.auth_password("utf8", unicodey) == [] + + def non_utf8(self): + """ + verify that non-utf-8 encoded passwords can be used for broken + servers. + """ + with server(defer=True) as (tc, ts): + assert tc.auth_password("non-utf8", "\xff") == [] + + def auth_exception_when_disconnected(self): + """ + verify that we catch a server disconnecting during auth, and report + it as an auth failure. + """ + with server(defer=True, skip_verify=True) as (tc, ts), raises( + AuthenticationException + ): + tc.auth_password("bad-server", "hello") + + def non_responsive_triggers_auth_exception(self): + """ + verify that authentication times out if server takes to long to + respond (or never responds). + """ + with server(defer=True, skip_verify=True) as (tc, ts), raises( + AuthenticationException + ) as info: + tc.auth_timeout = 1 # 1 second, to speed up test + tc.auth_password("unresponsive-server", "hello") + assert "Authentication timeout" in str(info.value) + + +class AuthOnlyHandler_: + def _server(self, *args, **kwargs): + kwargs.setdefault("transport_factory", ServiceRequestingTransport) + return server(*args, **kwargs) + + class fallback_pubkey_algorithm: + @requires_sha1_signing + def key_type_algo_selected_when_no_server_sig_algs(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + # Server pretending to be an apparently common setup: + # - doesn't support (or have enabled) sha2 + # - also doesn't support (or have enabled) server-sig-algs/ext-info + # This is the scenario in which Paramiko has to guess-the-algo, and + # where servers that don't support sha2 or server-sig-algs can give + # us trouble. + server_init = dict(_disable_sha2_pubkey, server_sig_algs=False) + with self._server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + server_init=server_init, + catch_error=True, + ) as (tc, ts, err): + # Auth did work + assert tc.is_authenticated() + # Selected ssh-rsa, instead of first-in-the-list (rsa-sha2-512) + assert tc._agreed_pubkey_algorithm == "ssh-rsa" + + @requires_sha1_signing + def key_type_algo_selection_is_cert_suffix_aware(self): + # This key has a cert next to it, which should trigger cert-aware + # loading within key classes. + privkey = PKey.from_path(_support("rsa.key")) + server_init = dict(_disable_sha2_pubkey, server_sig_algs=False) + with self._server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + server_init=server_init, + catch_error=True, + ) as (tc, ts, err): + assert not err + # Auth did work + assert tc.is_authenticated() + # Selected expected cert type + assert ( + tc._agreed_pubkey_algorithm + == "ssh-rsa-cert-v01@openssh.com" + ) + + @requires_sha1_signing + def uses_first_preferred_algo_if_key_type_not_in_list(self): + # This is functionally the same as legacy AuthHandler, just + # arriving at the same place in a different manner. + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + server_init = dict(_disable_sha2_pubkey, server_sig_algs=False) + with self._server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + server_init=server_init, + client_init=_disable_sha1_pubkey, # no ssh-rsa + catch_error=True, + ) as (tc, ts, err): + assert not tc.is_authenticated() + assert isinstance(err, AuthenticationException) + assert tc._agreed_pubkey_algorithm == "rsa-sha2-512" + + +class SHA2SignaturePubkeys: + def pubkey_auth_honors_disabled_algorithms(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + init=dict( + disabled_algorithms=dict( + pubkeys=["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512"] + ) + ), + catch_error=True, + ) as (_, _, err): + assert isinstance(err, SSHException) + assert "no RSA pubkey algorithms" in str(err) + + def client_sha2_disabled_server_sha1_disabled_no_match(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + client_init=_disable_sha2_pubkey, + server_init=_disable_sha1_pubkey, + catch_error=True, + ) as (tc, ts, err): + assert isinstance(err, AuthenticationException) + + def client_sha1_disabled_server_sha2_disabled_no_match(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + client_init=_disable_sha1_pubkey, + server_init=_disable_sha2_pubkey, + catch_error=True, + ) as (tc, ts, err): + assert isinstance(err, AuthenticationException) + + @requires_sha1_signing + def ssh_rsa_still_used_when_sha2_disabled(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + # NOTE: this works because key obj comparison uses public bytes + # TODO: would be nice for PKey to grow a legit "give me another obj of + # same class but just the public bits" using asbytes() + with server( + pubkeys=[privkey], connect=dict(pkey=privkey), init=_disable_sha2 + ) as (tc, _): + assert tc.is_authenticated() + + @requires_sha1_signing + def first_client_preferred_algo_used_when_no_server_sig_algs(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + # Server pretending to be an apparently common setup: + # - doesn't support (or have enabled) sha2 + # - also doesn't support (or have enabled) server-sig-algs/ext-info + # This is the scenario in which Paramiko has to guess-the-algo, and + # where servers that don't support sha2 or server-sig-algs give us + # trouble. + server_init = dict(_disable_sha2_pubkey, server_sig_algs=False) + with server( + pubkeys=[privkey], + connect=dict(username="slowdive", pkey=privkey), + server_init=server_init, + catch_error=True, + ) as (tc, ts, err): + assert not tc.is_authenticated() + assert isinstance(err, AuthenticationException) + # Oh no! this isn't ssh-rsa, and our server doesn't support sha2! + assert tc._agreed_pubkey_algorithm == "rsa-sha2-512" + + def sha2_512(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + init=dict( + disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-256"]) + ), + ) as (tc, ts): + assert tc.is_authenticated() + assert tc._agreed_pubkey_algorithm == "rsa-sha2-512" + + def sha2_256(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + init=dict( + disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"]) + ), + ) as (tc, ts): + assert tc.is_authenticated() + assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" + + def sha2_256_when_client_only_enables_256(self): + privkey = RSAKey.from_private_key_file(_support("rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + # Client-side only; server still accepts all 3. + client_init=dict( + disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"]) + ), + ) as (tc, ts): + assert tc.is_authenticated() + assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" + + +class AuthSource_: + class base_class: + def init_requires_and_saves_username(self): + with raises(TypeError): + AuthSource() + assert AuthSource(username="foo").username == "foo" + + def dunder_repr_delegates_to_helper(self): + source = AuthSource("foo") + source._repr = Mock(wraps=lambda: "whatever") + repr(source) + source._repr.assert_called_once_with() + + def repr_helper_prints_basic_kv_pairs(self): + assert repr(AuthSource("foo")) == "AuthSource()" + assert ( + AuthSource("foo")._repr(bar="open") == "AuthSource(bar='open')" + ) + + def authenticate_takes_transport_and_is_abstract(self): + # TODO: this test kinda just goes away once we're typed? + with raises(TypeError): + AuthSource("foo").authenticate() + with raises(NotImplementedError): + AuthSource("foo").authenticate(None) + + class NoneAuth_: + def authenticate_auths_none(self): + trans = Mock() + result = NoneAuth("foo").authenticate(trans) + trans.auth_none.assert_called_once_with("foo") + assert result is trans.auth_none.return_value + + def repr_shows_class(self): + assert repr(NoneAuth("foo")) == "NoneAuth()" + + class Password_: + def init_takes_and_stores_password_getter(self): + with raises(TypeError): + Password("foo") + getter = Mock() + pw = Password("foo", password_getter=getter) + assert pw.password_getter is getter + + def repr_adds_username(self): + pw = Password("foo", password_getter=Mock()) + assert repr(pw) == "Password(user='foo')" + + def authenticate_gets_and_supplies_password(self): + getter = Mock(return_value="bar") + trans = Mock() + pw = Password("foo", password_getter=getter) + result = pw.authenticate(trans) + trans.auth_password.assert_called_once_with("foo", "bar") + assert result is trans.auth_password.return_value + + class PrivateKey_: + def authenticate_calls_publickey_with_pkey(self): + source = PrivateKey(username="foo") + source.pkey = Mock() # set by subclasses + trans = Mock() + result = source.authenticate(trans) + trans.auth_publickey.assert_called_once_with("foo", source.pkey) + assert result is trans.auth_publickey.return_value + + class InMemoryPrivateKey_: + def init_takes_pkey_object(self): + with raises(TypeError): + InMemoryPrivateKey("foo") + pkey = Mock() + source = InMemoryPrivateKey(username="foo", pkey=pkey) + assert source.pkey is pkey + + def repr_shows_pkey_repr(self): + pkey = PKey.from_path(_support("ed25519.key")) + source = InMemoryPrivateKey("foo", pkey) + assert ( + repr(source) + == "InMemoryPrivateKey(pkey=PKey(alg=ED25519, bits=256, fp=SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow))" # noqa + ) + + def repr_appends_agent_flag_when_AgentKey(self): + real_key = PKey.from_path(_support("ed25519.key")) + pkey = AgentKey(agent=None, blob=bytes(real_key)) + source = InMemoryPrivateKey("foo", pkey) + assert ( + repr(source) + == "InMemoryPrivateKey(pkey=PKey(alg=ED25519, bits=256, fp=SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow)) [agent]" # noqa + ) + + class OnDiskPrivateKey_: + def init_takes_source_path_and_pkey(self): + with raises(TypeError): + OnDiskPrivateKey("foo") + with raises(TypeError): + OnDiskPrivateKey("foo", "bar") + with raises(TypeError): + OnDiskPrivateKey("foo", "bar", "biz") + source = OnDiskPrivateKey( + username="foo", + source="ssh-config", + path="of-exile", + pkey="notreally", + ) + assert source.username == "foo" + assert source.source == "ssh-config" + assert source.path == "of-exile" + assert source.pkey == "notreally" + + def init_requires_specific_value_for_source(self): + with raises( + ValueError, + match=r"source argument must be one of: \('ssh-config', 'python-config', 'implicit-home'\)", # noqa + ): + OnDiskPrivateKey("foo", source="what?", path="meh", pkey="no") + + def repr_reflects_source_path_and_pkey(self): + source = OnDiskPrivateKey( + username="foo", + source="ssh-config", + path="of-exile", + pkey="notreally", + ) + assert ( + repr(source) + == "OnDiskPrivateKey(key='notreally', source='ssh-config', path='of-exile')" # noqa + ) + + +class AuthResult_: + def setup_method(self): + self.strat = AuthStrategy(None) + + def acts_like_list_with_strategy_attribute(self): + with raises(TypeError): + AuthResult() + # kwarg works by itself + AuthResult(strategy=self.strat) + # or can be given as posarg w/ regular list() args after + result = AuthResult(self.strat, [1, 2, 3]) + assert result.strategy is self.strat + assert result == [1, 2, 3] + assert isinstance(result, list) + + def repr_is_list_repr_untouched(self): + result = AuthResult(self.strat, [1, 2, 3]) + assert repr(result) == "[1, 2, 3]" + + class dunder_str: + def is_multiline_display_of_sourceresult_tuples(self): + result = AuthResult(self.strat) + result.append(SourceResult("foo", "bar")) + result.append(SourceResult("biz", "baz")) + assert str(result) == "foo -> bar\nbiz -> baz" + + def shows_str_not_repr_of_auth_source_and_result(self): + result = AuthResult(self.strat) + result.append( + SourceResult(NoneAuth("foo"), ["password", "pubkey"]) + ) + assert str(result) == "NoneAuth() -> ['password', 'pubkey']" + + def empty_list_result_values_show_success_string(self): + result = AuthResult(self.strat) + result.append(SourceResult(NoneAuth("foo"), [])) + assert str(result) == "NoneAuth() -> success" + + +class AuthFailure_: + def is_an_AuthenticationException(self): + assert isinstance(AuthFailure(None), AuthenticationException) + + def init_requires_result(self): + with raises(TypeError): + AuthFailure() + result = AuthResult(None) + fail = AuthFailure(result=result) + assert fail.result is result + + def str_is_newline_plus_result_str(self): + result = AuthResult(None) + result.append(SourceResult(NoneAuth("foo"), Exception("onoz"))) + fail = AuthFailure(result) + assert str(fail) == "\nNoneAuth() -> onoz" + + +class AuthStrategy_: + def init_requires_ssh_config_param_and_sets_up_a_logger(self): + with raises(TypeError): + AuthStrategy() + conf = object() + strat = AuthStrategy(ssh_config=conf) + assert strat.ssh_config is conf + assert isinstance(strat.log, Logger) + assert strat.log.name == "paramiko.auth_strategy" + + def get_sources_is_abstract(self): + with raises(NotImplementedError): + AuthStrategy(None).get_sources() + + class authenticate: + def setup_method(self): + self.strat = AuthStrategy(None) # ssh_config not used directly + self.source, self.transport = NoneAuth(None), Mock() + self.source.authenticate = Mock() + self.strat.get_sources = Mock(return_value=[self.source]) + + def requires_and_uses_transport_with_methods_returning_result(self): + with raises(TypeError): + self.strat.authenticate() + result = self.strat.authenticate(self.transport) + self.strat.get_sources.assert_called_once_with() + self.source.authenticate.assert_called_once_with(self.transport) + assert isinstance(result, AuthResult) + assert result.strategy is self.strat + assert len(result) == 1 + source_res = result[0] + assert isinstance(source_res, SourceResult) + assert source_res.source is self.source + assert source_res.result is self.source.authenticate.return_value + + def logs_sources_attempted(self): + self.strat.log = Mock() + self.strat.authenticate(self.transport) + self.strat.log.debug.assert_called_once_with("Trying NoneAuth()") + + def raises_AuthFailure_if_no_successes(self): + self.strat.log = Mock() + oops = Exception("onoz") + self.source.authenticate.side_effect = oops + with raises(AuthFailure) as info: + self.strat.authenticate(self.transport) + result = info.value.result + assert isinstance(result, AuthResult) + assert len(result) == 1 + source_res = result[0] + assert isinstance(source_res, SourceResult) + assert source_res.source is self.source + assert source_res.result is oops + self.strat.log.info.assert_called_once_with( + "Authentication via NoneAuth() failed with Exception" + ) + + def short_circuits_on_successful_auth(self): + kaboom = Mock(authenticate=Mock(side_effect=Exception("onoz"))) + self.strat.get_sources.return_value = [self.source, kaboom] + result = self.strat.authenticate(self.transport) + # No exception, and it's just a regular ol Result + assert isinstance(result, AuthResult) + # And it did not capture any attempt to execute the 2nd source + assert len(result) == 1 + assert result[0].source is self.source diff --git a/tests/conftest.py b/tests/conftest.py index b28d2a17..12b97283 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,17 +2,30 @@ import logging import os import shutil import threading +from pathlib import Path + +from invoke.vendor.lexicon import Lexicon import pytest -from paramiko import RSAKey, SFTPServer, SFTP, Transport +from paramiko import ( + SFTPServer, + SFTP, + Transport, + DSSKey, + RSAKey, + Ed25519Key, + ECDSAKey, + PKey, +) -from .loop import LoopSocket -from .stub_sftp import StubServer, StubSFTPServer -from .util import _support +from ._loop import LoopSocket +from ._stub_sftp import StubServer, StubSFTPServer +from ._util import _support from icecream import ic, install as install_ic +# Better print() for debugging - use ic()! install_ic() ic.configureOutput(includeContext=True) @@ -68,10 +81,11 @@ def sftp_server(): socks = LoopSocket() sockc = LoopSocket() sockc.link(socks) + # TODO: reuse with new server fixture if possible tc = Transport(sockc) ts = Transport(socks) # Auth - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) ts.add_server_key(host_key) # Server setup event = threading.Event() @@ -103,3 +117,54 @@ def sftp(sftp_server): yield client # Clean up - as in make_sftp_folder, we assume local-only exec for now. shutil.rmtree(client.FOLDER, ignore_errors=True) + + +key_data = [ + ["ssh-rsa", RSAKey, "SHA256:OhNL391d/beeFnxxg18AwWVYTAHww+D4djEE7Co0Yng"], + ["ssh-dss", DSSKey, "SHA256:uHwwykG099f4M4kfzvFpKCTino0/P03DRbAidpAmPm0"], + [ + "ssh-ed25519", + Ed25519Key, + "SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow", + ], + [ + "ecdsa-sha2-nistp256", + ECDSAKey, + "SHA256:BrQG04oNKUETjKCeL4ifkARASg3yxS/pUHl3wWM26Yg", + ], +] +for datum in key_data: + # Add true first member with human-facing short algo name + short = datum[0].replace("ssh-", "").replace("sha2-nistp", "") + datum.insert(0, short) + + +@pytest.fixture(scope="session", params=key_data, ids=lambda x: x[0]) +def keys(request): + """ + Yield an object for each known type of key, with attributes: + + - ``short_type``: short identifier, eg ``rsa`` or ``ecdsa-256`` + - ``full_type``: the "message style" key identifier, eg ``ssh-rsa``, or + ``ecdsa-sha2-nistp256``. + - ``path``: a pathlib Path object to the fixture key file + - ``pkey``: PKey object, which may or may not also have a cert loaded + - ``expected_fp``: the expected fingerprint of said key + """ + short_type, key_type, key_class, fingerprint = request.param + bag = Lexicon() + bag.short_type = short_type + bag.full_type = key_type + bag.path = Path(_support(f"{short_type}.key")) + with bag.path.open() as fd: + bag.pkey = key_class.from_private_key(fd) + # Second copy for things like equality-but-not-identity testing + with bag.path.open() as fd: + bag.pkey2 = key_class.from_private_key(fd) + bag.expected_fp = fingerprint + # Also tack on the cert-bearing variant for some tests + cert = bag.path.with_suffix(".key-cert.pub") + bag.pkey_with_cert = PKey.from_path(cert) if cert.exists() else None + # Safety checks + assert bag.pkey.fingerprint == fingerprint + yield bag diff --git a/tests/pkey.py b/tests/pkey.py new file mode 100644 index 00000000..691fda0f --- /dev/null +++ b/tests/pkey.py @@ -0,0 +1,229 @@ +from pathlib import Path +from unittest.mock import patch, call + +from pytest import raises + +from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey +from paramiko import ( + DSSKey, + ECDSAKey, + Ed25519Key, + Message, + PKey, + PublicBlob, + RSAKey, + UnknownKeyType, +) + +from ._util import _support + + +class PKey_: + # NOTE: this is incidentally tested by a number of other tests, such as the + # agent.py test suite + class from_type_string: + def loads_from_type_and_bytes(self, keys): + obj = PKey.from_type_string(keys.full_type, keys.pkey.asbytes()) + assert obj == keys.pkey + + # TODO: exceptions + # + # TODO: passphrase? OTOH since this is aimed at the agent...irrelephant + + class from_path: + def loads_from_Path(self, keys): + obj = PKey.from_path(keys.path) + assert obj == keys.pkey + + def loads_from_str(self): + key = PKey.from_path(str(_support("rsa.key"))) + assert isinstance(key, RSAKey) + + @patch("paramiko.pkey.Path") + def expands_user(self, mPath): + # real key for guts that want a real key format + mykey = Path(_support("rsa.key")) + pathy = mPath.return_value.expanduser.return_value + # read_bytes for cryptography.io's loaders + pathy.read_bytes.return_value = mykey.read_bytes() + # open() for our own class loader + pathy.open.return_value = mykey.open() + # fake out exists() to avoid attempts to load cert + pathy.exists.return_value = False + PKey.from_path("whatever") # we're not testing expanduser itself + # Both key and cert paths + mPath.return_value.expanduser.assert_has_calls([call(), call()]) + + def raises_UnknownKeyType_for_unknown_types(self): + # I.e. a real, becomes a useful object via cryptography.io, key + # class that we do NOT support. Chose Ed448 randomly as OpenSSH + # doesn't seem to support it either, going by ssh-keygen... + keypath = _support("ed448.key") + with raises(UnknownKeyType) as exc: + PKey.from_path(keypath) + assert issubclass(exc.value.key_type, Ed448PrivateKey) + with open(keypath, "rb") as fd: + assert exc.value.key_bytes == fd.read() + + def leaves_cryptography_exceptions_untouched(self): + # a Python file is not a private key! + with raises(ValueError): + PKey.from_path(__file__) + + # TODO: passphrase support tested + + class automatically_loads_certificates: + def existing_cert_loaded_when_given_key_path(self): + key = PKey.from_path(_support("rsa.key")) + # Public blob exists despite no .load_certificate call + assert key.public_blob is not None + assert ( + key.public_blob.key_type == "ssh-rsa-cert-v01@openssh.com" + ) + # And it's definitely the one we expected + assert key.public_blob == PublicBlob.from_file( + _support("rsa.key-cert.pub") + ) + + def can_be_given_cert_path_instead(self): + key = PKey.from_path(_support("rsa.key-cert.pub")) + # It's still a key, not a PublicBlob + assert isinstance(key, RSAKey) + # Public blob exists despite no .load_certificate call + assert key.public_blob is not None + assert ( + key.public_blob.key_type == "ssh-rsa-cert-v01@openssh.com" + ) + # And it's definitely the one we expected + assert key.public_blob == PublicBlob.from_file( + _support("rsa.key-cert.pub") + ) + + def no_cert_load_if_no_cert(self): + # This key exists (it's a copy of the regular one) but has no + # matching -cert.pub + key = PKey.from_path(_support("rsa-lonely.key")) + assert key.public_blob is None + + def excepts_usefully_if_no_key_only_cert(self): + # TODO: is that truly an error condition? the cert is ~the + # pubkey and we still require the privkey for signing, yea? + # This cert exists (it's a copy of the regular one) but there's + # no rsa-missing.key to load. + with raises(FileNotFoundError) as info: + PKey.from_path(_support("rsa-missing.key-cert.pub")) + assert info.value.filename.endswith("rsa-missing.key") + + class load_certificate: + def rsa_public_cert_blobs(self): + # Data to test signing with (arbitrary) + data = b"ice weasels" + # Load key w/o cert at first (so avoiding .from_path) + key = RSAKey.from_private_key_file(_support("rsa.key")) + assert key.public_blob is None + # Sign regular-style (using, arbitrarily, SHA2) + msg = key.sign_ssh_data(data, "rsa-sha2-256") + msg.rewind() + assert "rsa-sha2-256" == msg.get_text() + signed = msg.get_binary() # for comparison later + + # Load cert and inspect its internals + key.load_certificate(_support("rsa.key-cert.pub")) + assert key.public_blob is not None + assert key.public_blob.key_type == "ssh-rsa-cert-v01@openssh.com" + assert key.public_blob.comment == "test_rsa.key.pub" + msg = Message(key.public_blob.key_blob) + # cert type + assert msg.get_text() == "ssh-rsa-cert-v01@openssh.com" + # nonce + msg.get_string() + # public numbers + assert msg.get_mpint() == key.public_numbers.e + assert msg.get_mpint() == key.public_numbers.n + # serial number + assert msg.get_int64() == 1234 + # TODO: whoever wrote the OG tests didn't care about the remaining + # fields from + # https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys + # so neither do I, for now... + + # Sign cert-style (still SHA256 - so this actually does almost + # exactly the same thing under the hood as the previous sign) + msg = key.sign_ssh_data(data, "rsa-sha2-256-cert-v01@openssh.com") + msg.rewind() + assert "rsa-sha2-256" == msg.get_text() + assert signed == msg.get_binary() # same signature as above + msg.rewind() + assert key.verify_ssh_sig(b"ice weasels", msg) # our data verified + + def loading_cert_of_different_type_from_key_raises_ValueError(self): + edkey = Ed25519Key.from_private_key_file(_support("ed25519.key")) + err = "PublicBlob type ssh-rsa-cert-v01@openssh.com incompatible with key type ssh-ed25519" # noqa + with raises(ValueError, match=err): + edkey.load_certificate(_support("rsa.key-cert.pub")) + + def fingerprint(self, keys): + # NOTE: Hardcoded fingerprint expectation stored in fixture. + assert keys.pkey.fingerprint == keys.expected_fp + + def algorithm_name(self, keys): + key = keys.pkey + if isinstance(key, RSAKey): + assert key.algorithm_name == "RSA" + elif isinstance(key, DSSKey): + assert key.algorithm_name == "DSS" + elif isinstance(key, ECDSAKey): + assert key.algorithm_name == "ECDSA" + elif isinstance(key, Ed25519Key): + assert key.algorithm_name == "ED25519" + # TODO: corner case: AgentKey, whose .name can be cert-y (due to the + # value of the name field passed via agent protocol) and thus + # algorithm_name is eg "RSA-CERT" - keys loaded directly from disk will + # never look this way, even if they have a .public_blob attached. + + class equality_and_hashing: + def same_key_is_equal_to_itself(self, keys): + assert keys.pkey == keys.pkey2 + + def same_key_same_hash(self, keys): + # NOTE: this isn't a great test due to hashseed randomization under + # Python 3 preventing use of static values, but it does still prove + # that __hash__ is implemented/doesn't explode & works across + # instances + assert hash(keys.pkey) == hash(keys.pkey2) + + def keys_are_not_equal_to_other_types(self, keys): + for value in [None, True, ""]: + assert keys.pkey != value + + class identifiers_classmethods: + def default_is_class_name_attribute(self): + # NOTE: not all classes _have_ this, only the ones that don't + # customize identifiers(). + class MyKey(PKey): + name = "it me" + + assert MyKey.identifiers() == ["it me"] + + def rsa_is_all_combos_of_cert_and_sha_type(self): + assert RSAKey.identifiers() == [ + "ssh-rsa", + "ssh-rsa-cert-v01@openssh.com", + "rsa-sha2-256", + "rsa-sha2-256-cert-v01@openssh.com", + "rsa-sha2-512", + "rsa-sha2-512-cert-v01@openssh.com", + ] + + def dss_is_protocol_name(self): + assert DSSKey.identifiers() == ["ssh-dss"] + + def ed25519_is_protocol_name(self): + assert Ed25519Key.identifiers() == ["ssh-ed25519"] + + def ecdsa_is_all_curve_names(self): + assert ECDSAKey.identifiers() == [ + "ecdsa-sha2-nistp256", + "ecdsa-sha2-nistp384", + "ecdsa-sha2-nistp521", + ] diff --git a/tests/test_agent.py b/tests/test_agent.py deleted file mode 100644 index 01602d0e..00000000 --- a/tests/test_agent.py +++ /dev/null @@ -1,50 +0,0 @@ -import unittest - -from paramiko.message import Message -from paramiko.agent import ( - SSH2_AGENT_SIGN_RESPONSE, - cSSH2_AGENTC_SIGN_REQUEST, - SSH_AGENT_RSA_SHA2_256, - SSH_AGENT_RSA_SHA2_512, - AgentKey, -) -from paramiko.util import b - - -class ChaosAgent: - def _send_message(self, msg): - self._sent_message = msg - sig = Message() - sig.add_string(b("lol")) - sig.rewind() - return SSH2_AGENT_SIGN_RESPONSE, sig - - -class AgentTests(unittest.TestCase): - def _sign_with_agent(self, kwargs, expectation): - agent = ChaosAgent() - key = AgentKey(agent, b("secret!!!")) - result = key.sign_ssh_data(b("token"), **kwargs) - assert result == b("lol") - msg = agent._sent_message - msg.rewind() - assert msg.get_byte() == cSSH2_AGENTC_SIGN_REQUEST - assert msg.get_string() == b("secret!!!") - assert msg.get_string() == b("token") - assert msg.get_int() == expectation - - def test_agent_signing_defaults_to_0_for_flags_field(self): - # No algorithm kwarg at all - self._sign_with_agent(kwargs=dict(), expectation=0) - - def test_agent_signing_is_2_for_SHA256(self): - self._sign_with_agent( - kwargs=dict(algorithm="rsa-sha2-256"), - expectation=SSH_AGENT_RSA_SHA2_256, - ) - - def test_agent_signing_is_2_for_SHA512(self): - self._sign_with_agent( - kwargs=dict(algorithm="rsa-sha2-512"), - expectation=SSH_AGENT_RSA_SHA2_512, - ) diff --git a/tests/test_auth.py b/tests/test_auth.py deleted file mode 100644 index 4a960deb..00000000 --- a/tests/test_auth.py +++ /dev/null @@ -1,272 +0,0 @@ -# Copyright (C) 2008 Robey Pointer <robeypointer@gmail.com> -# -# This file is part of paramiko. -# -# Paramiko is free software; you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation; either version 2.1 of the License, or (at your option) -# any later version. -# -# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY -# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR -# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -""" -Some unit tests for authenticating over a Transport. -""" - -import sys -import threading -import unittest -from time import sleep - -from paramiko import ( - Transport, - ServerInterface, - RSAKey, - DSSKey, - BadAuthenticationType, - InteractiveQuery, - AuthenticationException, -) -from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL -from paramiko.util import u - -from .loop import LoopSocket -from .util import _support, slow - - -_pwd = u("\u2022") - - -class NullServer(ServerInterface): - paranoid_did_password = False - paranoid_did_public_key = False - paranoid_key = DSSKey.from_private_key_file(_support("test_dss.key")) - - def get_allowed_auths(self, username): - if username == "slowdive": - return "publickey,password" - if username == "paranoid": - if ( - not self.paranoid_did_password - and not self.paranoid_did_public_key - ): - return "publickey,password" - elif self.paranoid_did_password: - return "publickey" - else: - return "password" - if username == "commie": - return "keyboard-interactive" - if username == "utf8": - return "password" - if username == "non-utf8": - return "password" - return "publickey" - - def check_auth_password(self, username, password): - if (username == "slowdive") and (password == "pygmalion"): - return AUTH_SUCCESSFUL - if (username == "paranoid") and (password == "paranoid"): - # 2-part auth (even openssh doesn't support this) - self.paranoid_did_password = True - if self.paranoid_did_public_key: - return AUTH_SUCCESSFUL - return AUTH_PARTIALLY_SUCCESSFUL - if (username == "utf8") and (password == _pwd): - return AUTH_SUCCESSFUL - if (username == "non-utf8") and (password == "\xff"): - return AUTH_SUCCESSFUL - if username == "bad-server": - raise Exception("Ack!") - if username == "unresponsive-server": - sleep(5) - return AUTH_SUCCESSFUL - return AUTH_FAILED - - def check_auth_publickey(self, username, key): - if (username == "paranoid") and (key == self.paranoid_key): - # 2-part auth - self.paranoid_did_public_key = True - if self.paranoid_did_password: - return AUTH_SUCCESSFUL - return AUTH_PARTIALLY_SUCCESSFUL - return AUTH_FAILED - - def check_auth_interactive(self, username, submethods): - if username == "commie": - self.username = username - return InteractiveQuery( - "password", "Please enter a password.", ("Password", False) - ) - return AUTH_FAILED - - def check_auth_interactive_response(self, responses): - if self.username == "commie": - if (len(responses) == 1) and (responses[0] == "cat"): - return AUTH_SUCCESSFUL - return AUTH_FAILED - - -class AuthTest(unittest.TestCase): - def setUp(self): - self.socks = LoopSocket() - self.sockc = LoopSocket() - self.sockc.link(self.socks) - self.tc = Transport(self.sockc) - self.ts = Transport(self.socks) - - def tearDown(self): - self.tc.close() - self.ts.close() - self.socks.close() - self.sockc.close() - - def start_server(self): - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) - self.public_host_key = RSAKey(data=host_key.asbytes()) - self.ts.add_server_key(host_key) - self.event = threading.Event() - self.server = NullServer() - self.assertTrue(not self.event.is_set()) - self.ts.start_server(self.event, self.server) - - def verify_finished(self): - self.event.wait(1.0) - self.assertTrue(self.event.is_set()) - self.assertTrue(self.ts.is_active()) - - def test_bad_auth_type(self): - """ - verify that we get the right exception when an unsupported auth - type is requested. - """ - self.start_server() - try: - self.tc.connect( - hostkey=self.public_host_key, - username="unknown", - password="error", - ) - self.assertTrue(False) - except: - etype, evalue, etb = sys.exc_info() - self.assertEqual(BadAuthenticationType, etype) - self.assertEqual(["publickey"], evalue.allowed_types) - - def test_bad_password(self): - """ - verify that a bad password gets the right exception, and that a retry - with the right password works. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - try: - self.tc.auth_password(username="slowdive", password="error") - self.assertTrue(False) - except: - etype, evalue, etb = sys.exc_info() - self.assertTrue(issubclass(etype, AuthenticationException)) - self.tc.auth_password(username="slowdive", password="pygmalion") - self.verify_finished() - - def test_multipart_auth(self): - """ - verify that multipart auth works. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - remain = self.tc.auth_password( - username="paranoid", password="paranoid" - ) - self.assertEqual(["publickey"], remain) - key = DSSKey.from_private_key_file(_support("test_dss.key")) - remain = self.tc.auth_publickey(username="paranoid", key=key) - self.assertEqual([], remain) - self.verify_finished() - - def test_interactive_auth(self): - """ - verify keyboard-interactive auth works. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - - def handler(title, instructions, prompts): - self.got_title = title - self.got_instructions = instructions - self.got_prompts = prompts - return ["cat"] - - remain = self.tc.auth_interactive("commie", handler) - self.assertEqual(self.got_title, "password") - self.assertEqual(self.got_prompts, [("Password", False)]) - self.assertEqual([], remain) - self.verify_finished() - - def test_interactive_auth_fallback(self): - """ - verify that a password auth attempt will fallback to "interactive" - if password auth isn't supported but interactive is. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - remain = self.tc.auth_password("commie", "cat") - self.assertEqual([], remain) - self.verify_finished() - - def test_auth_utf8(self): - """ - verify that utf-8 encoding happens in authentication. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - remain = self.tc.auth_password("utf8", _pwd) - self.assertEqual([], remain) - self.verify_finished() - - def test_auth_non_utf8(self): - """ - verify that non-utf-8 encoded passwords can be used for broken - servers. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - remain = self.tc.auth_password("non-utf8", "\xff") - self.assertEqual([], remain) - self.verify_finished() - - def test_auth_gets_disconnected(self): - """ - verify that we catch a server disconnecting during auth, and report - it as an auth failure. - """ - self.start_server() - self.tc.connect(hostkey=self.public_host_key) - try: - self.tc.auth_password("bad-server", "hello") - except: - etype, evalue, etb = sys.exc_info() - self.assertTrue(issubclass(etype, AuthenticationException)) - - @slow - def test_auth_non_responsive(self): - """ - verify that authentication times out if server takes to long to - respond (or never responds). - """ - self.tc.auth_timeout = 1 # 1 second, to speed up test - self.start_server() - self.tc.connect() - try: - self.tc.auth_password("unresponsive-server", "hello") - except: - etype, evalue, etb = sys.exc_info() - self.assertTrue(issubclass(etype, AuthenticationException)) - self.assertTrue("Authentication timeout" in str(evalue)) diff --git a/tests/test_client.py b/tests/test_client.py index 21ecd72a..1c0c6c84 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -41,7 +41,7 @@ from paramiko import SSHClient from paramiko.pkey import PublicBlob from paramiko.ssh_exception import SSHException, AuthenticationException -from .util import _support, requires_sha1_signing, slow +from ._util import _support, requires_sha1_signing, slow requires_gss_auth = unittest.skipUnless( @@ -171,10 +171,10 @@ class ClientTest(unittest.TestCase): self.ts = paramiko.Transport(self.socks) if server_name is not None: self.ts.local_version = server_name - keypath = _support("test_rsa.key") + keypath = _support("rsa.key") host_key = paramiko.RSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) - keypath = _support("test_ecdsa_256.key") + keypath = _support("ecdsa-256.key") host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob) @@ -194,9 +194,7 @@ class ClientTest(unittest.TestCase): run_kwargs[key] = kwargs.pop(key, None) # Server setup threading.Thread(target=self._run, kwargs=run_kwargs).start() - host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") - ) + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) # Client setup @@ -256,25 +254,25 @@ class SSHClientTest(ClientTest): """ verify that SSHClient works with a DSA key. """ - self._test_connection(key_filename=_support("test_dss.key")) + self._test_connection(key_filename=_support("dss.key")) @requires_sha1_signing def test_client_rsa(self): """ verify that SSHClient works with an RSA key. """ - self._test_connection(key_filename=_support("test_rsa.key")) + self._test_connection(key_filename=_support("rsa.key")) @requires_sha1_signing def test_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ - self._test_connection(key_filename=_support("test_ecdsa_256.key")) + self._test_connection(key_filename=_support("ecdsa-256.key")) @requires_sha1_signing def test_client_ed25519(self): - self._test_connection(key_filename=_support("test_ed25519.key")) + self._test_connection(key_filename=_support("ed25519.key")) @requires_sha1_signing def test_multiple_key_files(self): @@ -289,16 +287,17 @@ class SSHClientTest(ClientTest): } # Various combos of attempted & valid keys # TODO: try every possible combo using itertools functions + # TODO: use new key(s) fixture(s) for attempt, accept in ( (["rsa", "dss"], ["dss"]), # Original test #3 (["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly - (["dss", "rsa", "ecdsa_256"], ["dss"]), # Try ECDSA but fail - (["rsa", "ecdsa_256"], ["ecdsa"]), # ECDSA success + (["dss", "rsa", "ecdsa-256"], ["dss"]), # Try ECDSA but fail + (["rsa", "ecdsa-256"], ["ecdsa"]), # ECDSA success ): try: self._test_connection( key_filename=[ - _support("test_{}.key".format(x)) for x in attempt + _support("{}.key".format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -318,7 +317,7 @@ class SSHClientTest(ClientTest): self.assertRaises( SSHException, self._test_connection, - key_filename=[_support("test_rsa.key")], + key_filename=[_support("rsa.key")], allowed_keys=["ecdsa-sha2-nistp256"], ) @@ -328,30 +327,26 @@ class SSHClientTest(ClientTest): # They're similar except for which path is given; the expected auth and # server-side behavior is 100% identical.) # NOTE: only bothered whipping up one cert per overall class/family. - for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): - cert_name = "test_{}.key-cert.pub".format(type_) - cert_path = _support(os.path.join("cert_support", cert_name)) + for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"): + key_path = _support(f"{type_}.key") self._test_connection( - key_filename=cert_path, - public_blob=PublicBlob.from_file(cert_path), + key_filename=key_path, + public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"), ) @requires_sha1_signing def test_certs_implicitly_loaded_alongside_key_filename_keys(self): - # NOTE: a regular test_connection() w/ test_rsa.key would incidentally + # NOTE: a regular test_connection() w/ rsa.key would incidentally # test this (because test_xxx.key-cert.pub exists) but incidental tests # stink, so NullServer and friends were updated to allow assertions # about the server-side key object's public blob. Thus, we can prove # that a specific cert was found, along with regular authorization # succeeding proving that the overall flow works. - for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): - key_name = "test_{}.key".format(type_) - key_path = _support(os.path.join("cert_support", key_name)) + for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"): + key_path = _support(f"{type_}.key") self._test_connection( key_filename=key_path, - public_blob=PublicBlob.from_file( - "{}-cert.pub".format(key_path) - ), + public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"), ) def _cert_algo_test(self, ver, alg): @@ -360,9 +355,7 @@ class SSHClientTest(ClientTest): self._test_connection( # NOTE: SSHClient is able to take either the key or the cert & will # set up its internals as needed - key_filename=_support( - os.path.join("cert_support", "test_rsa.key-cert.pub") - ), + key_filename=_support("rsa.key-cert.pub"), server_name="SSH-2.0-OpenSSH_{}".format(ver), ) assert ( @@ -391,7 +384,7 @@ class SSHClientTest(ClientTest): """ threading.Thread(target=self._run).start() hostname = f"[{self.addr}]:{self.port}" - key_file = _support("test_ecdsa_256.key") + key_file = _support("ecdsa-256.key") public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = SSHClient() @@ -414,9 +407,7 @@ class SSHClientTest(ClientTest): """ warnings.filterwarnings("ignore", "tempnam.*") - host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") - ) + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) fd, localname = mkstemp() os.close(fd) @@ -516,9 +507,7 @@ class SSHClientTest(ClientTest): """ # Start the thread with a 1 second wait. threading.Thread(target=self._run, kwargs={"delay": 1}).start() - host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") - ) + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = SSHClient() @@ -593,7 +582,7 @@ class SSHClientTest(ClientTest): """ Failed gssapi-keyex doesn't prevent subsequent key from succeeding """ - kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")]) + kwargs = dict(gss_kex=True, key_filename=[_support("rsa.key")]) self._test_connection(**kwargs) @requires_gss_auth @@ -601,7 +590,7 @@ class SSHClientTest(ClientTest): """ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding """ - kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")]) + kwargs = dict(gss_auth=True, key_filename=[_support("rsa.key")]) self._test_connection(**kwargs) def test_reject_policy(self): @@ -683,11 +672,11 @@ class SSHClientTest(ClientTest): self._client_host_key_bad(host_key) def test_host_key_negotiation_3(self): - self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key") + self._client_host_key_good(paramiko.ECDSAKey, "ecdsa-256.key") @requires_sha1_signing def test_host_key_negotiation_4(self): - self._client_host_key_good(paramiko.RSAKey, "test_rsa.key") + self._client_host_key_good(paramiko.RSAKey, "rsa.key") def _setup_for_env(self): threading.Thread(target=self._run).start() diff --git a/tests/test_config.py b/tests/test_config.py index a2c60a32..fcb120b6 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -19,7 +19,7 @@ from paramiko import ( ConfigParseError, ) -from .util import _config +from ._util import _config @fixture diff --git a/tests/test_dss.key b/tests/test_dss.key deleted file mode 100644 index e10807f1..00000000 --- a/tests/test_dss.key +++ /dev/null @@ -1,12 +0,0 @@ ------BEGIN DSA PRIVATE KEY----- -MIIBuwIBAAKBgQDngaYDZ30c6/7cJgEEbtl8FgKdwhba1Z7oOrOn4MI/6C42G1bY -wMuqZf4dBCglsdq39SHrcjbE8Vq54gPSOh3g4+uV9Rcg5IOoPLbwp2jQfF6f1FIb -sx7hrDCIqUcQccPSxetPBKmXI9RN8rZLaFuQeTnI65BKM98Ruwvq6SI2LwIVAPDP -hSeawaJI27mKqOfe5PPBSmyHAoGBAJMXxXmPD9sGaQ419DIpmZecJKBUAy9uXD8x -gbgeDpwfDaFJP8owByCKREocPFfi86LjCuQkyUKOfjYMN6iHIf1oEZjB8uJAatUr -FzI0ArXtUqOhwTLwTyFuUojE5own2WYsOAGByvgfyWjsGhvckYNhI4ODpNdPlxQ8 -ZamaPGPsAoGARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmn -jO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacI -BlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgECFGI9QPSc -h9pT9XHqn+1rZ4bK+QGA ------END DSA PRIVATE KEY----- diff --git a/tests/test_ecdsa_256.key b/tests/test_ecdsa_256.key deleted file mode 100644 index 42d44734..00000000 --- a/tests/test_ecdsa_256.key +++ /dev/null @@ -1,5 +0,0 @@ ------BEGIN EC PRIVATE KEY----- -MHcCAQEEIKB6ty3yVyKEnfF/zprx0qwC76MsMlHY4HXCnqho2eKioAoGCCqGSM49 -AwEHoUQDQgAElI9mbdlaS+T9nHxY/59lFnn80EEecZDBHq4gLpccY8Mge5ZTMiMD -ADRvOqQ5R98Sxst765CAqXmRtz8vwoD96g== ------END EC PRIVATE KEY----- diff --git a/tests/test_ed25519.key b/tests/test_ed25519.key deleted file mode 100644 index eb9f94c2..00000000 --- a/tests/test_ed25519.key +++ /dev/null @@ -1,8 +0,0 @@ ------BEGIN OPENSSH PRIVATE KEY----- -b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW -QyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXwAAAKhjwAdrY8AH -awAAAAtzc2gtZWQyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXw -AAAEA9tGQi2IrprbOSbDCF+RmAHd6meNSXBUQ2ekKXm4/8xnr1K9komH/1WBIvQbbtvnFV -hryd62EfcgRFuLRiokNfAAAAI2FsZXhfZ2F5bm9yQEFsZXhzLU1hY0Jvb2stQWlyLmxvY2 -FsAQI= ------END OPENSSH PRIVATE KEY----- diff --git a/tests/test_file.py b/tests/test_file.py index 456c0388..9344495b 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -26,7 +26,7 @@ from io import BytesIO from paramiko.common import linefeed_byte, crlf, cr_byte from paramiko.file import BufferedFile -from .util import needs_builtin +from ._util import needs_builtin class LoopbackFile(BufferedFile): diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 671f1ba0..da62fd97 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -24,7 +24,7 @@ Test the used APIs for GSS-API / SSPI authentication import socket -from .util import needs_gssapi, KerberosTestCase, update_env +from ._util import needs_gssapi, KerberosTestCase, update_env # # NOTE: KerberosTestCase skips all tests if it was unable to import k5test diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py index bdda295a..a028411d 100644 --- a/tests/test_hostkeys.py +++ b/tests/test_hostkeys.py @@ -36,6 +36,11 @@ broken.example.com ssh-rsa AAAA happy.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\ 5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M= +modern.example.com ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIKHEChAIxsh2hr8Q\ ++Ea1AAHZyfEB2elEc2YgduVzBtp+ +curvy.example.com ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlz\ +dHAyNTYAAABBBAa+pY7djSpbg5viAcZhPt56AO3U3Sd7h7dnlUp0EjfDgyYHYQxl2QZ4JGgfwR5iv9\ +T9iRZjQzvJd5s+kBAZtpk= """ test_hosts_file_tabs = """\ @@ -45,9 +50,11 @@ D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz9uhc= happy.example.com\tssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\ 5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M= -doublespace.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkp\ -KhOk5r9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11M\ -l8om3D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz8BtZ= +modern.example.com\tssh-ed25519\tAAAAC3NzaC1lZDI1NTE5AAAAIKHEChAIxsh2hr8Q\ ++Ea1AAHZyfEB2elEc2YgduVzBtp+ +curvy.example.com\tecdsa-sha2-nistp256\tAAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbml\ +zdHAyNTYAAABBBAa+pY7djSpbg5viAcZhPt56AO3U3Sd7h7dnlUp0EjfDgyYHYQxl2QZ4JGgfwR5iv\ +9T9iRZjQzvJd5s+kBAZtpk= """ keyblob = b"""\ @@ -76,7 +83,7 @@ class HostKeysTest(unittest.TestCase): def test_load(self): hostdict = paramiko.HostKeys("hostfile.temp") - self.assertEqual(2, len(hostdict)) + assert len(hostdict) == 4 self.assertEqual(1, len(list(hostdict.values())[0])) self.assertEqual(1, len(list(hostdict.values())[1])) fp = hexlify( @@ -89,7 +96,7 @@ class HostKeysTest(unittest.TestCase): hh = "|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c=" key = paramiko.RSAKey(data=decodebytes(keyblob)) hostdict.add(hh, "ssh-rsa", key) - self.assertEqual(3, len(list(hostdict))) + assert len(hostdict) == 5 x = hostdict["foo.example.com"] fp = hexlify(x["ssh-rsa"].get_fingerprint()).upper() self.assertEqual(b"7EC91BB336CB6D810B124B1353C32396", fp) @@ -105,10 +112,8 @@ class HostKeysTest(unittest.TestCase): self.assertTrue(x is not None) fp = hexlify(x["ssh-rsa"].get_fingerprint()).upper() self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp) - i = 0 - for key in hostdict: - i += 1 - self.assertEqual(2, i) + assert list(hostdict) == hostdict.keys() + assert len(list(hostdict)) == len(hostdict.keys()) == 4 def test_dict_set(self): hostdict = paramiko.HostKeys("hostfile.temp") @@ -118,7 +123,7 @@ class HostKeysTest(unittest.TestCase): hostdict["fake.example.com"] = {} hostdict["fake.example.com"]["ssh-rsa"] = key - self.assertEqual(3, len(hostdict)) + assert len(hostdict) == 5 self.assertEqual(2, len(list(hostdict.values())[0])) self.assertEqual(1, len(list(hostdict.values())[1])) self.assertEqual(1, len(list(hostdict.values())[2])) diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index d4868f4a..a81b1959 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -31,7 +31,7 @@ import unittest import paramiko -from .util import needs_gssapi, KerberosTestCase, update_env +from ._util import needs_gssapi, KerberosTestCase, update_env, _support class NullServer(paramiko.ServerInterface): @@ -80,7 +80,7 @@ class GSSKexTest(KerberosTestCase): def _run(self): self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks, gss_kex=True) - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) self.ts.add_server_key(host_key) self.ts.set_gss_host(self.realm.hostname) try: @@ -96,7 +96,7 @@ class GSSKexTest(KerberosTestCase): Diffie-Hellman Key Exchange and user authentication with the GSS-API context created during key exchange. """ - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index d4dd58ad..aee21c21 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -30,7 +30,7 @@ from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes from paramiko import Message, Packetizer, util from paramiko.common import byte_chr, zero_byte -from .loop import LoopSocket +from ._loop import LoopSocket x55 = byte_chr(0x55) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 4d74d8aa..d4d193b8 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -45,7 +45,7 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers from unittest.mock import patch, Mock import pytest -from .util import _support, is_low_entropy, requires_sha1_signing +from ._util import _support, is_low_entropy, requires_sha1_signing # from openssh's ssh-keygen @@ -138,12 +138,6 @@ TEST_KEY_BYTESTR = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\ class KeyTest(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - def assert_keyfile_is_encrypted(self, keyfile): """ A quick check that filename looks like an encrypted key. @@ -161,7 +155,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(exp, key) def test_load_rsa(self): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) self.assertEqual("ssh-rsa", key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(":", "")) my_rsa = hexlify(key.get_fingerprint()) @@ -184,7 +178,7 @@ class KeyTest(unittest.TestCase): ) as loader: loader.side_effect = exception with pytest.raises(SSHException, match=str(exception)): - RSAKey.from_private_key_file(_support("test_rsa.key")) + RSAKey.from_private_key_file(_support("rsa.key")) def test_loading_empty_keys_errors_usefully(self): # #1599 - raise SSHException instead of IndexError @@ -203,7 +197,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(1024, key.get_bits()) def test_load_dss(self): - key = DSSKey.from_private_key_file(_support("test_dss.key")) + key = DSSKey.from_private_key_file(_support("dss.key")) self.assertEqual("ssh-dss", key.get_name()) exp_dss = b(FINGER_DSS.split()[1].replace(":", "")) my_dss = hexlify(key.get_fingerprint()) @@ -231,7 +225,7 @@ class KeyTest(unittest.TestCase): def test_compare_rsa(self): # verify that the private & public keys compare equal - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) self.assertEqual(key, key) pub = RSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -240,7 +234,7 @@ class KeyTest(unittest.TestCase): def test_compare_dss(self): # verify that the private & public keys compare equal - key = DSSKey.from_private_key_file(_support("test_dss.key")) + key = DSSKey.from_private_key_file(_support("dss.key")) self.assertEqual(key, key) pub = DSSKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -248,7 +242,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, pub) def _sign_and_verify_rsa(self, algorithm, saved_sig): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) msg = key.sign_ssh_data(b"ice weasels", algorithm) assert isinstance(msg, Message) msg.rewind() @@ -273,7 +267,7 @@ class KeyTest(unittest.TestCase): def test_sign_dss(self): # verify that the dss private key can sign and verify - key = DSSKey.from_private_key_file(_support("test_dss.key")) + key = DSSKey.from_private_key_file(_support("dss.key")) msg = key.sign_ssh_data(b"ice weasels") self.assertTrue(type(msg) is Message) msg.rewind() @@ -329,7 +323,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key.get_name(), "ecdsa-sha2-nistp521") def test_load_ecdsa_256(self): - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) self.assertEqual("ecdsa-sha2-nistp256", key.get_name()) exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(":", "")) my_ecdsa = hexlify(key.get_fingerprint()) @@ -357,7 +351,7 @@ class KeyTest(unittest.TestCase): def test_compare_ecdsa_256(self): # verify that the private & public keys compare equal - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -366,7 +360,7 @@ class KeyTest(unittest.TestCase): def test_sign_ecdsa_256(self): # verify that the rsa private key can sign and verify - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) msg = key.sign_ssh_data(b"ice weasels") self.assertTrue(type(msg) is Message) msg.rewind() @@ -408,7 +402,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(384, key.get_bits()) def test_load_ecdsa_transmutes_crypto_exceptions(self): - path = _support("test_ecdsa_256.key") + path = _support("ecdsa-256.key") # TODO: nix unittest for pytest for exception in (TypeError("onoz"), UnsupportedAlgorithm("oops")): with patch( @@ -569,12 +563,12 @@ class KeyTest(unittest.TestCase): RSAKey.from_private_key_file(_support("test_rsa_openssh_nopad.key")) def test_stringification(self): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) comparable = TEST_KEY_BYTESTR self.assertEqual(str(key), comparable) def test_ed25519(self): - key1 = Ed25519Key.from_private_key_file(_support("test_ed25519.key")) + key1 = Ed25519Key.from_private_key_file(_support("ed25519.key")) key2 = Ed25519Key.from_private_key_file( _support("test_ed25519_password.key"), b"abc123" ) @@ -594,7 +588,7 @@ class KeyTest(unittest.TestCase): def test_ed25519_compare(self): # verify that the private & public keys compare equal - key = Ed25519Key.from_private_key_file(_support("test_ed25519.key")) + key = Ed25519Key.from_private_key_file(_support("ed25519.key")) self.assertEqual(key, key) pub = Ed25519Key(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -616,33 +610,6 @@ class KeyTest(unittest.TestCase): ) assert original != generated - def keys(self): - for key_class, filename in [ - (RSAKey, "test_rsa.key"), - (DSSKey, "test_dss.key"), - (ECDSAKey, "test_ecdsa_256.key"), - (Ed25519Key, "test_ed25519.key"), - ]: - key1 = key_class.from_private_key_file(_support(filename)) - key2 = key_class.from_private_key_file(_support(filename)) - yield key1, key2 - - def test_keys_are_comparable(self): - for key1, key2 in self.keys(): - assert key1 == key2 - - def test_keys_are_not_equal_to_other(self): - for value in [None, True, ""]: - for key1, _ in self.keys(): - assert key1 != value - - def test_keys_are_hashable(self): - # NOTE: this isn't a great test due to hashseed randomization under - # Python 3 preventing use of static values, but it does still prove - # that __hash__ is implemented/doesn't explode & works across instances - for key1, key2 in self.keys(): - assert hash(key1) == hash(key2) - def test_ed25519_nonbytes_password(self): # https://github.com/paramiko/paramiko/issues/1039 Ed25519Key.from_private_key_file( @@ -654,7 +621,7 @@ class KeyTest(unittest.TestCase): # No exception -> it's good. Meh. def test_ed25519_load_from_file_obj(self): - with open(_support("test_ed25519.key")) as pkey_fileobj: + with open(_support("ed25519.key")) as pkey_fileobj: key = Ed25519Key.from_private_key(pkey_fileobj) self.assertEqual(key, key) self.assertTrue(key.can_sign()) @@ -674,43 +641,6 @@ class KeyTest(unittest.TestCase): finally: os.remove(newfile) - def test_certificates(self): - # NOTE: we also test 'live' use of cert auth for all key types in - # test_client.py; this and nearby cert tests are more about the gritty - # details. - # PKey.load_certificate - key_path = _support(os.path.join("cert_support", "test_rsa.key")) - key = RSAKey.from_private_key_file(key_path) - self.assertTrue(key.public_blob is None) - cert_path = _support( - os.path.join("cert_support", "test_rsa.key-cert.pub") - ) - key.load_certificate(cert_path) - self.assertTrue(key.public_blob is not None) - self.assertEqual( - key.public_blob.key_type, "ssh-rsa-cert-v01@openssh.com" - ) - self.assertEqual(key.public_blob.comment, "test_rsa.key.pub") - # Delve into blob contents, for test purposes - msg = Message(key.public_blob.key_blob) - self.assertEqual(msg.get_text(), "ssh-rsa-cert-v01@openssh.com") - msg.get_string() - e = msg.get_mpint() - n = msg.get_mpint() - self.assertEqual(e, key.public_numbers.e) - self.assertEqual(n, key.public_numbers.n) - # Serial number - self.assertEqual(msg.get_int64(), 1234) - - # Prevented from loading certificate that doesn't match - key_path = _support(os.path.join("cert_support", "test_ed25519.key")) - key1 = Ed25519Key.from_private_key_file(key_path) - self.assertRaises( - ValueError, - key1.load_certificate, - _support("test_rsa.key-cert.pub"), - ) - @patch("paramiko.pkey.os") def _test_keyfile_race(self, os_, exists): # Re: CVE-2022-24302 @@ -764,22 +694,3 @@ class KeyTest(unittest.TestCase): finally: if os.path.exists(new): os.unlink(new) - - def test_sign_rsa_with_certificate(self): - data = b"ice weasels" - key_path = _support(os.path.join("cert_support", "test_rsa.key")) - key = RSAKey.from_private_key_file(key_path) - msg = key.sign_ssh_data(data, "rsa-sha2-256") - msg.rewind() - assert "rsa-sha2-256" == msg.get_text() - sign = msg.get_binary() - cert_path = _support( - os.path.join("cert_support", "test_rsa.key-cert.pub") - ) - key.load_certificate(cert_path) - msg = key.sign_ssh_data(data, "rsa-sha2-256-cert-v01@openssh.com") - msg.rewind() - assert "rsa-sha2-256" == msg.get_text() - assert sign == msg.get_binary() - msg.rewind() - assert key.verify_ssh_sig(b"ice weasels", msg) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index be123de4..7fd274bc 100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -38,8 +38,8 @@ from paramiko.sftp_attr import SFTPAttributes from paramiko.util import b, u from tests import requireNonAsciiLocale -from .util import needs_builtin -from .util import slow +from ._util import needs_builtin +from ._util import slow ARTICLE = """ diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index 5192f657..acfe71e3 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -30,7 +30,7 @@ import time from paramiko.common import o660 -from .util import slow +from ._util import slow @slow diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index a8175ccb..b441a225 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -28,7 +28,7 @@ import threading import paramiko -from .util import _support, needs_gssapi, KerberosTestCase, update_env +from ._util import _support, needs_gssapi, KerberosTestCase, update_env from .test_client import FINGERPRINTS @@ -89,7 +89,7 @@ class GSSAuthTest(KerberosTestCase): def _run(self): self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks) - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) self.ts.add_server_key(host_key) server = NullServer() self.ts.start_server(self.event, server) @@ -100,7 +100,7 @@ class GSSAuthTest(KerberosTestCase): The exception is ... no exception yet """ - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() @@ -154,7 +154,7 @@ class GSSAuthTest(KerberosTestCase): "this_host_does_not_exists_and_causes_a_GSSAPI-exception" ) self._test_connection( - key_filename=[_support("test_rsa.key")], + key_filename=[_support("rsa.key")], allow_agent=False, look_for_keys=False, ) diff --git a/tests/test_transport.py b/tests/test_transport.py index 4062d767..b2efd637 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -22,7 +22,6 @@ Some unit tests for the ssh2 protocol in Transport. from binascii import hexlify -from contextlib import contextmanager import select import socket import time @@ -34,18 +33,16 @@ from unittest.mock import Mock from paramiko import ( AuthHandler, ChannelException, - DSSKey, Packetizer, RSAKey, SSHException, - AuthenticationException, IncompatiblePeer, SecurityOptions, - ServerInterface, + ServiceRequestingTransport, Transport, ) -from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL -from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED +from paramiko.auth_handler import AuthOnlyHandler +from paramiko import OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED from paramiko.common import ( DEFAULT_MAX_PACKET_SIZE, DEFAULT_WINDOW_SIZE, @@ -60,8 +57,17 @@ from paramiko.common import ( ) from paramiko.message import Message -from .util import needs_builtin, _support, requires_sha1_signing, slow -from .loop import LoopSocket +from ._util import ( + needs_builtin, + _support, + requires_sha1_signing, + slow, + server, + _disable_sha2, + _disable_sha1, + TestServer as NullServer, +) +from ._loop import LoopSocket LONG_BANNER = """\ @@ -77,80 +83,11 @@ Maybe. """ -class NullServer(ServerInterface): - paranoid_did_password = False - paranoid_did_public_key = False - paranoid_key = DSSKey.from_private_key_file(_support("test_dss.key")) - - def __init__(self, allowed_keys=None): - self.allowed_keys = allowed_keys if allowed_keys is not None else [] - - def get_allowed_auths(self, username): - if username == "slowdive": - return "publickey,password" - return "publickey" - - def check_auth_password(self, username, password): - if (username == "slowdive") and (password == "pygmalion"): - return AUTH_SUCCESSFUL - return AUTH_FAILED - - def check_auth_publickey(self, username, key): - if key in self.allowed_keys: - return AUTH_SUCCESSFUL - return AUTH_FAILED - - def check_channel_request(self, kind, chanid): - if kind == "bogus": - return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED - return OPEN_SUCCEEDED - - def check_channel_exec_request(self, channel, command): - if command != b"yes": - return False - return True - - def check_channel_shell_request(self, channel): - return True - - def check_global_request(self, kind, msg): - self._global_request = kind - # NOTE: for w/e reason, older impl of this returned False always, even - # tho that's only supposed to occur if the request cannot be served. - # For now, leaving that the default unless test supplies specific - # 'acceptable' request kind - return kind == "acceptable" - - def check_channel_x11_request( - self, - channel, - single_connection, - auth_protocol, - auth_cookie, - screen_number, - ): - self._x11_single_connection = single_connection - self._x11_auth_protocol = auth_protocol - self._x11_auth_cookie = auth_cookie - self._x11_screen_number = screen_number - return True - - def check_port_forward_request(self, addr, port): - self._listen = socket.socket() - self._listen.bind(("127.0.0.1", 0)) - self._listen.listen(1) - return self._listen.getsockname()[1] - - def cancel_port_forward_request(self, addr, port): - self._listen.close() - self._listen = None - - def check_channel_direct_tcpip_request(self, chanid, origin, destination): - self._tcpip_dest = destination - return OPEN_SUCCEEDED - - class TransportTest(unittest.TestCase): + # TODO: this can get nuked once ServiceRequestingTransport becomes the + # only Transport, as it has this baked in. + _auth_handler_class = AuthHandler + def setUp(self): self.socks = LoopSocket() self.sockc = LoopSocket() @@ -168,7 +105,7 @@ class TransportTest(unittest.TestCase): def setup_test_server( self, client_options=None, server_options=None, connect_kwargs=None ): - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) @@ -234,7 +171,7 @@ class TransportTest(unittest.TestCase): loopback sockets. this is hardly "simple" but it's simpler than the later tests. :) """ - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -260,7 +197,7 @@ class TransportTest(unittest.TestCase): """ verify that a long banner doesn't mess up the handshake. """ - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -910,7 +847,7 @@ class TransportTest(unittest.TestCase): # be fine. Even tho it's a bit squicky. self.tc.packetizer = SlowPacketizer(self.tc.sock) # Continue with regular test red tape. - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -1099,7 +1036,8 @@ class TransportTest(unittest.TestCase): def test_server_transports_reject_client_message_types(self): # TODO: handle Transport's own tables too, not just its inner auth # handler's table. See TODOs in auth_handler.py - for message_type in AuthHandler._client_handler_table: + some_handler = self._auth_handler_class(self.tc) + for message_type in some_handler._client_handler_table: self._send_client_message(message_type) self._expect_unimplemented() # Reset for rest of loop @@ -1115,6 +1053,21 @@ class TransportTest(unittest.TestCase): self._expect_unimplemented() +# TODO: for now this is purely a regression test. It needs actual tests of the +# intentional new behavior too! +class ServiceRequestingTransportTest(TransportTest): + _auth_handler_class = AuthOnlyHandler + + def setUp(self): + # Copypasta (Transport init is load-bearing) + self.socks = LoopSocket() + self.sockc = LoopSocket() + self.sockc.link(self.socks) + # New class who dis + self.tc = ServiceRequestingTransport(self.sockc) + self.ts = ServiceRequestingTransport(self.socks) + + class AlgorithmDisablingTests(unittest.TestCase): def test_preferred_lists_default_to_private_attribute_contents(self): t = Transport(sock=Mock()) @@ -1188,98 +1141,6 @@ class AlgorithmDisablingTests(unittest.TestCase): assert "zlib" not in compressions -@contextmanager -def server( - hostkey=None, - init=None, - server_init=None, - client_init=None, - connect=None, - pubkeys=None, - catch_error=False, -): - """ - SSH server contextmanager for testing. - - :param hostkey: - Host key to use for the server; if None, loads - ``test_rsa.key``. - :param init: - Default `Transport` constructor kwargs to use for both sides. - :param server_init: - Extends and/or overrides ``init`` for server transport only. - :param client_init: - Extends and/or overrides ``init`` for client transport only. - :param connect: - Kwargs to use for ``connect()`` on the client. - :param pubkeys: - List of public keys for auth. - :param catch_error: - Whether to capture connection errors & yield from contextmanager. - Necessary for connection_time exception testing. - """ - if init is None: - init = {} - if server_init is None: - server_init = {} - if client_init is None: - client_init = {} - if connect is None: - connect = dict(username="slowdive", password="pygmalion") - socks = LoopSocket() - sockc = LoopSocket() - sockc.link(socks) - tc = Transport(sockc, **dict(init, **client_init)) - ts = Transport(socks, **dict(init, **server_init)) - - if hostkey is None: - hostkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - ts.add_server_key(hostkey) - event = threading.Event() - server = NullServer(allowed_keys=pubkeys) - assert not event.is_set() - assert not ts.is_active() - assert tc.get_username() is None - assert ts.get_username() is None - assert not tc.is_authenticated() - assert not ts.is_authenticated() - - err = None - # Trap errors and yield instead of raising right away; otherwise callers - # cannot usefully deal with problems at connect time which stem from errors - # in the server side. - try: - ts.start_server(event, server) - tc.connect(**connect) - - event.wait(1.0) - assert event.is_set() - assert ts.is_active() - assert tc.is_active() - - except Exception as e: - if not catch_error: - raise - err = e - - yield (tc, ts, err) if catch_error else (tc, ts) - - tc.close() - ts.close() - socks.close() - sockc.close() - - -_disable_sha2 = dict( - disabled_algorithms=dict(keys=["rsa-sha2-256", "rsa-sha2-512"]) -) -_disable_sha1 = dict(disabled_algorithms=dict(keys=["ssh-rsa"])) -_disable_sha2_pubkey = dict( - disabled_algorithms=dict(pubkeys=["rsa-sha2-256", "rsa-sha2-512"]) -) -_disable_sha1_pubkey = dict(disabled_algorithms=dict(pubkeys=["ssh-rsa"])) - - class TestSHA2SignatureKeyExchange(unittest.TestCase): # NOTE: these all rely on the default server() hostkey being RSA # NOTE: these rely on both sides being properly implemented re: agreed-upon @@ -1343,8 +1204,11 @@ class TestSHA2SignatureKeyExchange(unittest.TestCase): # (This is a regression test vs previous implementation which overwrote # the entire preferred-hostkeys structure when given an explicit key as # a client.) - hostkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server(hostkey=hostkey, connect=dict(hostkey=hostkey)) as (tc, _): + hostkey = RSAKey.from_private_key_file(_support("rsa.key")) + connect = dict( + hostkey=hostkey, username="slowdive", password="pygmalion" + ) + with server(hostkey=hostkey, connect=connect) as (tc, _): assert tc.host_key_type == "rsa-sha2-512" @@ -1358,7 +1222,7 @@ class TestExtInfo(unittest.TestCase): } def test_client_uses_server_sig_algs_for_pubkey_auth(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1367,94 +1231,6 @@ class TestExtInfo(unittest.TestCase): ), ) as (tc, _): assert tc.is_authenticated() - # Client settled on 256 despite itself not having 512 disabled - assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" - - -# TODO: these could move into test_auth.py but that badly needs refactoring -# with this module anyways... -class TestSHA2SignaturePubkeys(unittest.TestCase): - def test_pubkey_auth_honors_disabled_algorithms(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - init=dict( - disabled_algorithms=dict( - pubkeys=["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512"] - ) - ), - catch_error=True, - ) as (_, _, err): - assert isinstance(err, SSHException) - assert "no RSA pubkey algorithms" in str(err) - - def test_client_sha2_disabled_server_sha1_disabled_no_match(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - client_init=_disable_sha2_pubkey, - server_init=_disable_sha1_pubkey, - catch_error=True, - ) as (tc, ts, err): - assert isinstance(err, AuthenticationException) - - def test_client_sha1_disabled_server_sha2_disabled_no_match(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - client_init=_disable_sha1_pubkey, - server_init=_disable_sha2_pubkey, - catch_error=True, - ) as (tc, ts, err): - assert isinstance(err, AuthenticationException) - - @requires_sha1_signing - def test_ssh_rsa_still_used_when_sha2_disabled(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - # NOTE: this works because key obj comparison uses public bytes - # TODO: would be nice for PKey to grow a legit "give me another obj of - # same class but just the public bits" using asbytes() - with server( - pubkeys=[privkey], connect=dict(pkey=privkey), init=_disable_sha2 - ) as (tc, _): - assert tc.is_authenticated() - - def test_sha2_512(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - init=dict( - disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-256"]) - ), - ) as (tc, ts): - assert tc.is_authenticated() - assert tc._agreed_pubkey_algorithm == "rsa-sha2-512" - - def test_sha2_256(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - init=dict( - disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"]) - ), - ) as (tc, ts): - assert tc.is_authenticated() - assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" - - def test_sha2_256_when_client_only_enables_256(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) - with server( - pubkeys=[privkey], - connect=dict(pkey=privkey), - # Client-side only; server still accepts all 3. - client_init=dict( - disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"]) - ), - ) as (tc, ts): - assert tc.is_authenticated() + # Client settled on 256 despite itself not having 512 disabled (and + # otherwise, 512 would have been earlier in the preferred list) assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" diff --git a/tests/test_util.py b/tests/test_util.py index ec03846b..a2a8224e 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -49,6 +49,11 @@ class UtilTest(unittest.TestCase): "Agent", "AgentKey", "AuthenticationException", + "AuthFailure", + "AuthHandler", + "AuthResult", + "AuthSource", + "AuthStrategy", "AutoAddPolicy", "BadAuthenticationType", "BufferedFile", @@ -58,9 +63,14 @@ class UtilTest(unittest.TestCase): "CouldNotCanonicalize", "DSSKey", "HostKeys", + "InMemoryPrivateKey", "Message", "MissingHostKeyPolicy", + "NoneAuth", + "OnDiskPrivateKey", + "Password", "PasswordRequiredException", + "PrivateKey", "RSAKey", "RejectPolicy", "SFTP", @@ -77,12 +87,13 @@ class UtilTest(unittest.TestCase): "SSHException", "SecurityOptions", "ServerInterface", + "SourceResult", "SubsystemHandler", "Transport", "WarningPolicy", "util", ): - assert name in paramiko.__all__ + assert name in dir(paramiko) def test_generate_key_bytes(self): key_bytes = paramiko.util.generate_key_bytes( diff --git a/tests/util.py b/tests/util.py deleted file mode 100644 index 0639f8ae..00000000 --- a/tests/util.py +++ /dev/null @@ -1,174 +0,0 @@ -from os.path import dirname, realpath, join -import builtins -import os -import struct -import sys -import unittest - -import pytest - -from paramiko.ssh_gss import GSS_AUTH_AVAILABLE - -from cryptography.exceptions import UnsupportedAlgorithm, _Reasons -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.asymmetric import padding, rsa - -tests_dir = dirname(realpath(__file__)) - - -def _support(filename): - return join(tests_dir, filename) - - -def _config(name): - return join(tests_dir, "configs", name) - - -needs_gssapi = pytest.mark.skipif( - not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test" -) - - -def needs_builtin(name): - """ - Skip decorated test if builtin name does not exist. - """ - reason = "Test requires a builtin '{}'".format(name) - return pytest.mark.skipif(not hasattr(builtins, name), reason=reason) - - -slow = pytest.mark.slow - -# GSSAPI / Kerberos related tests need a working Kerberos environment. -# The class `KerberosTestCase` provides such an environment or skips all tests. -# There are 3 distinct cases: -# -# - A Kerberos environment has already been created and the environment -# contains the required information. -# -# - We can use the package 'k5test' to setup an working kerberos environment on -# the fly. -# -# - We skip all tests. -# -# ToDo: add a Windows specific implementation? - -if ( - os.environ.get("K5TEST_USER_PRINC", None) - and os.environ.get("K5TEST_HOSTNAME", None) - and os.environ.get("KRB5_KTNAME", None) -): # add other vars as needed - - # The environment provides the required information - class DummyK5Realm: - def __init__(self): - for k in os.environ: - if not k.startswith("K5TEST_"): - continue - setattr(self, k[7:].lower(), os.environ[k]) - self.env = {} - - class KerberosTestCase(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.realm = DummyK5Realm() - - @classmethod - def tearDownClass(cls): - del cls.realm - -else: - try: - # Try to setup a kerberos environment - from k5test import KerberosTestCase - except Exception: - # Use a dummy, that skips all tests - class KerberosTestCase(unittest.TestCase): - @classmethod - def setUpClass(cls): - raise unittest.SkipTest( - "Missing extension package k5test. " - 'Please run "pip install k5test" ' - "to install it." - ) - - -def update_env(testcase, mapping, env=os.environ): - """Modify os.environ during a test case and restore during cleanup.""" - saved_env = env.copy() - - def replace(target, source): - target.update(source) - for k in list(target): - if k not in source: - target.pop(k, None) - - testcase.addCleanup(replace, env, saved_env) - env.update(mapping) - return testcase - - -def k5shell(args=None): - """Create a shell with an kerberos environment - - This can be used to debug paramiko or to test the old GSSAPI. - To test a different GSSAPI, simply activate a suitable venv - within the shell. - """ - import k5test - import atexit - import subprocess - - k5 = k5test.K5Realm() - atexit.register(k5.stop) - os.environ.update(k5.env) - for n in ("realm", "user_princ", "hostname"): - os.environ["K5TEST_" + n.upper()] = getattr(k5, n) - - if not args: - args = sys.argv[1:] - if not args: - args = [os.environ.get("SHELL", "bash")] - sys.exit(subprocess.call(args)) - - -def is_low_entropy(): - """ - Attempts to detect whether running interpreter is low-entropy. - - "low-entropy" is defined as being in 32-bit mode and with the hash seed set - to zero. - """ - is_32bit = struct.calcsize("P") == 32 / 8 - # I don't see a way to tell internally if the hash seed was set this - # way, but env should be plenty sufficient, this is only for testing. - return is_32bit and os.environ.get("PYTHONHASHSEED", None) == "0" - - -def sha1_signing_unsupported(): - """ - This is used to skip tests in environments where SHA-1 signing is - not supported by the backend. - """ - private_key = rsa.generate_private_key( - public_exponent=65537, key_size=2048, backend=default_backend() - ) - message = b"Some dummy text" - try: - private_key.sign( - message, - padding.PSS( - mgf=padding.MGF1(hashes.SHA1()), - salt_length=padding.PSS.MAX_LENGTH, - ), - hashes.SHA1(), - ) - return False - except UnsupportedAlgorithm as e: - return e._reason is _Reasons.UNSUPPORTED_HASH - - -requires_sha1_signing = unittest.skipIf( - sha1_signing_unsupported(), "SHA-1 signing not supported" -) |